use anyhow::Context as _;
use async_graphql::{
connection::{query, Connection, Edge, OpaqueCursor},
Context, Description, Enum, Object, Union, ID,
};
use chrono::{DateTime, Utc};
use mas_data_model::Device;
use mas_storage::{
app_session::AppSessionFilter,
compat::{CompatSessionFilter, CompatSsoLoginFilter, CompatSsoLoginRepository},
oauth2::{OAuth2SessionFilter, OAuth2SessionRepository},
upstream_oauth2::{UpstreamOAuthLinkFilter, UpstreamOAuthLinkRepository},
user::{BrowserSessionFilter, BrowserSessionRepository, UserEmailFilter, UserEmailRepository},
Pagination, RepositoryAccess,
};
use super::{
compat_sessions::{CompatSessionType, CompatSsoLogin},
matrix::MatrixUser,
BrowserSession, CompatSession, Cursor, NodeCursor, NodeType, OAuth2Session,
PreloadedTotalCount, SessionState, UpstreamOAuth2Link,
};
use crate::graphql::{state::ContextExt, DateFilter};
#[derive(Description)]
pub struct User(pub mas_data_model::User);
impl From<mas_data_model::User> for User {
fn from(v: mas_data_model::User) -> Self {
Self(v)
}
}
impl From<mas_data_model::BrowserSession> for User {
fn from(v: mas_data_model::BrowserSession) -> Self {
Self(v.user)
}
}
#[Object(use_type_description)]
impl User {
pub async fn id(&self) -> ID {
NodeType::User.id(self.0.id)
}
async fn username(&self) -> &str {
&self.0.username
}
pub async fn created_at(&self) -> DateTime<Utc> {
self.0.created_at
}
pub async fn locked_at(&self) -> Option<DateTime<Utc>> {
self.0.locked_at
}
pub async fn can_request_admin(&self) -> bool {
self.0.can_request_admin
}
async fn matrix(&self, ctx: &Context<'_>) -> Result<MatrixUser, async_graphql::Error> {
let state = ctx.state();
let conn = state.homeserver_connection();
Ok(MatrixUser::load(conn, &self.0.username).await?)
}
async fn primary_email(
&self,
ctx: &Context<'_>,
) -> Result<Option<UserEmail>, async_graphql::Error> {
let state = ctx.state();
let mut repo = state.repository().await?;
let user_email = repo.user_email().get_primary(&self.0).await?.map(UserEmail);
repo.cancel().await?;
Ok(user_email)
}
async fn compat_sso_logins(
&self,
ctx: &Context<'_>,
#[graphql(desc = "Returns the elements in the list that come after the cursor.")]
after: Option<String>,
#[graphql(desc = "Returns the elements in the list that come before the cursor.")]
before: Option<String>,
#[graphql(desc = "Returns the first *n* elements from the list.")] first: Option<i32>,
#[graphql(desc = "Returns the last *n* elements from the list.")] last: Option<i32>,
) -> Result<Connection<Cursor, CompatSsoLogin, PreloadedTotalCount>, async_graphql::Error> {
let state = ctx.state();
let mut repo = state.repository().await?;
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::CompatSsoLogin))
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::CompatSsoLogin))
.transpose()?;
let pagination = Pagination::try_new(before_id, after_id, first, last)?;
let filter = CompatSsoLoginFilter::new().for_user(&self.0);
let page = repo.compat_sso_login().list(filter, pagination).await?;
let count = if ctx.look_ahead().field("totalCount").exists() {
Some(repo.compat_sso_login().count(filter).await?)
} else {
None
};
repo.cancel().await?;
let mut connection = Connection::with_additional_fields(
page.has_previous_page,
page.has_next_page,
PreloadedTotalCount(count),
);
connection.edges.extend(page.edges.into_iter().map(|u| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::CompatSsoLogin, u.id)),
CompatSsoLogin(u),
)
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn compat_sessions(
&self,
ctx: &Context<'_>,
#[graphql(name = "state", desc = "List only sessions with the given state.")]
state_param: Option<SessionState>,
#[graphql(name = "type", desc = "List only sessions with the given type.")]
type_param: Option<CompatSessionType>,
#[graphql(
name = "lastActive",
desc = "List only sessions with a last active time is between the given bounds."
)]
last_active: Option<DateFilter>,
#[graphql(desc = "Returns the elements in the list that come after the cursor.")]
after: Option<String>,
#[graphql(desc = "Returns the elements in the list that come before the cursor.")]
before: Option<String>,
#[graphql(desc = "Returns the first *n* elements from the list.")] first: Option<i32>,
#[graphql(desc = "Returns the last *n* elements from the list.")] last: Option<i32>,
) -> Result<Connection<Cursor, CompatSession, PreloadedTotalCount>, async_graphql::Error> {
let state = ctx.state();
let mut repo = state.repository().await?;
let last_active = last_active.unwrap_or_default();
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::CompatSession))
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::CompatSession))
.transpose()?;
let pagination = Pagination::try_new(before_id, after_id, first, last)?;
let filter = CompatSessionFilter::new().for_user(&self.0);
let filter = match state_param {
Some(SessionState::Active) => filter.active_only(),
Some(SessionState::Finished) => filter.finished_only(),
None => filter,
};
let filter = match type_param {
Some(CompatSessionType::SsoLogin) => filter.sso_login_only(),
Some(CompatSessionType::Unknown) => filter.unknown_only(),
None => filter,
};
let filter = match last_active.after {
Some(after) => filter.with_last_active_after(after),
None => filter,
};
let filter = match last_active.before {
Some(before) => filter.with_last_active_before(before),
None => filter,
};
let page = repo.compat_session().list(filter, pagination).await?;
let count = if ctx.look_ahead().field("totalCount").exists() {
Some(repo.compat_session().count(filter).await?)
} else {
None
};
repo.cancel().await?;
let mut connection = Connection::with_additional_fields(
page.has_previous_page,
page.has_next_page,
PreloadedTotalCount(count),
);
connection
.edges
.extend(page.edges.into_iter().map(|(session, sso_login)| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::CompatSession, session.id)),
CompatSession::new(session).with_loaded_sso_login(sso_login),
)
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
async fn browser_sessions(
&self,
ctx: &Context<'_>,
#[graphql(name = "state", desc = "List only sessions in the given state.")]
state_param: Option<SessionState>,
#[graphql(
name = "lastActive",
desc = "List only sessions with a last active time is between the given bounds."
)]
last_active: Option<DateFilter>,
#[graphql(desc = "Returns the elements in the list that come after the cursor.")]
after: Option<String>,
#[graphql(desc = "Returns the elements in the list that come before the cursor.")]
before: Option<String>,
#[graphql(desc = "Returns the first *n* elements from the list.")] first: Option<i32>,
#[graphql(desc = "Returns the last *n* elements from the list.")] last: Option<i32>,
) -> Result<Connection<Cursor, BrowserSession, PreloadedTotalCount>, async_graphql::Error> {
let state = ctx.state();
let mut repo = state.repository().await?;
let last_active = last_active.unwrap_or_default();
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::BrowserSession))
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::BrowserSession))
.transpose()?;
let pagination = Pagination::try_new(before_id, after_id, first, last)?;
let filter = BrowserSessionFilter::new().for_user(&self.0);
let filter = match state_param {
Some(SessionState::Active) => filter.active_only(),
Some(SessionState::Finished) => filter.finished_only(),
None => filter,
};
let filter = match last_active.after {
Some(after) => filter.with_last_active_after(after),
None => filter,
};
let filter = match last_active.before {
Some(before) => filter.with_last_active_before(before),
None => filter,
};
let page = repo.browser_session().list(filter, pagination).await?;
let count = if ctx.look_ahead().field("totalCount").exists() {
Some(repo.browser_session().count(filter).await?)
} else {
None
};
repo.cancel().await?;
let mut connection = Connection::with_additional_fields(
page.has_previous_page,
page.has_next_page,
PreloadedTotalCount(count),
);
connection.edges.extend(page.edges.into_iter().map(|u| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::BrowserSession, u.id)),
BrowserSession(u),
)
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
async fn emails(
&self,
ctx: &Context<'_>,
#[graphql(name = "state", desc = "List only emails in the given state.")]
state_param: Option<UserEmailState>,
#[graphql(desc = "Returns the elements in the list that come after the cursor.")]
after: Option<String>,
#[graphql(desc = "Returns the elements in the list that come before the cursor.")]
before: Option<String>,
#[graphql(desc = "Returns the first *n* elements from the list.")] first: Option<i32>,
#[graphql(desc = "Returns the last *n* elements from the list.")] last: Option<i32>,
) -> Result<Connection<Cursor, UserEmail, PreloadedTotalCount>, async_graphql::Error> {
let state = ctx.state();
let mut repo = state.repository().await?;
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::UserEmail))
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::UserEmail))
.transpose()?;
let pagination = Pagination::try_new(before_id, after_id, first, last)?;
let filter = UserEmailFilter::new().for_user(&self.0);
let filter = match state_param {
Some(UserEmailState::Pending) => filter.pending_only(),
Some(UserEmailState::Confirmed) => filter.verified_only(),
None => filter,
};
let page = repo.user_email().list(filter, pagination).await?;
let count = if ctx.look_ahead().field("totalCount").exists() {
Some(repo.user_email().count(filter).await?)
} else {
None
};
repo.cancel().await?;
let mut connection = Connection::with_additional_fields(
page.has_previous_page,
page.has_next_page,
PreloadedTotalCount(count),
);
connection.edges.extend(page.edges.into_iter().map(|u| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::UserEmail, u.id)),
UserEmail(u),
)
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn oauth2_sessions(
&self,
ctx: &Context<'_>,
#[graphql(name = "state", desc = "List only sessions in the given state.")]
state_param: Option<SessionState>,
#[graphql(desc = "List only sessions for the given client.")] client: Option<ID>,
#[graphql(
name = "lastActive",
desc = "List only sessions with a last active time is between the given bounds."
)]
last_active: Option<DateFilter>,
#[graphql(desc = "Returns the elements in the list that come after the cursor.")]
after: Option<String>,
#[graphql(desc = "Returns the elements in the list that come before the cursor.")]
before: Option<String>,
#[graphql(desc = "Returns the first *n* elements from the list.")] first: Option<i32>,
#[graphql(desc = "Returns the last *n* elements from the list.")] last: Option<i32>,
) -> Result<Connection<Cursor, OAuth2Session, PreloadedTotalCount>, async_graphql::Error> {
let state = ctx.state();
let mut repo = state.repository().await?;
let last_active = last_active.unwrap_or_default();
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::OAuth2Session))
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::OAuth2Session))
.transpose()?;
let pagination = Pagination::try_new(before_id, after_id, first, last)?;
let client = if let Some(id) = client {
let id = NodeType::OAuth2Client.extract_ulid(&id)?;
let client = repo
.oauth2_client()
.lookup(id)
.await?
.ok_or(async_graphql::Error::new("Unknown client ID"))?;
Some(client)
} else {
None
};
let filter = OAuth2SessionFilter::new().for_user(&self.0);
let filter = match state_param {
Some(SessionState::Active) => filter.active_only(),
Some(SessionState::Finished) => filter.finished_only(),
None => filter,
};
let filter = match client.as_ref() {
Some(client) => filter.for_client(client),
None => filter,
};
let filter = match last_active.after {
Some(after) => filter.with_last_active_after(after),
None => filter,
};
let filter = match last_active.before {
Some(before) => filter.with_last_active_before(before),
None => filter,
};
let page = repo.oauth2_session().list(filter, pagination).await?;
let count = if ctx.look_ahead().field("totalCount").exists() {
Some(repo.oauth2_session().count(filter).await?)
} else {
None
};
repo.cancel().await?;
let mut connection = Connection::with_additional_fields(
page.has_previous_page,
page.has_next_page,
PreloadedTotalCount(count),
);
connection.edges.extend(page.edges.into_iter().map(|s| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::OAuth2Session, s.id)),
OAuth2Session(s),
)
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
async fn upstream_oauth2_links(
&self,
ctx: &Context<'_>,
#[graphql(desc = "Returns the elements in the list that come after the cursor.")]
after: Option<String>,
#[graphql(desc = "Returns the elements in the list that come before the cursor.")]
before: Option<String>,
#[graphql(desc = "Returns the first *n* elements from the list.")] first: Option<i32>,
#[graphql(desc = "Returns the last *n* elements from the list.")] last: Option<i32>,
) -> Result<Connection<Cursor, UpstreamOAuth2Link, PreloadedTotalCount>, async_graphql::Error>
{
let state = ctx.state();
let mut repo = state.repository().await?;
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| {
x.extract_for_type(NodeType::UpstreamOAuth2Link)
})
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| {
x.extract_for_type(NodeType::UpstreamOAuth2Link)
})
.transpose()?;
let pagination = Pagination::try_new(before_id, after_id, first, last)?;
let filter = UpstreamOAuthLinkFilter::new()
.for_user(&self.0)
.enabled_providers_only();
let page = repo.upstream_oauth_link().list(filter, pagination).await?;
let count = if ctx.look_ahead().field("totalCount").exists() {
Some(repo.upstream_oauth_link().count(filter).await?)
} else {
None
};
repo.cancel().await?;
let mut connection = Connection::with_additional_fields(
page.has_previous_page,
page.has_next_page,
PreloadedTotalCount(count),
);
connection.edges.extend(page.edges.into_iter().map(|s| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::UpstreamOAuth2Link, s.id)),
UpstreamOAuth2Link::new(s),
)
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn app_sessions(
&self,
ctx: &Context<'_>,
#[graphql(name = "state", desc = "List only sessions in the given state.")]
state_param: Option<SessionState>,
#[graphql(name = "device", desc = "List only sessions for the given device.")]
device_param: Option<String>,
#[graphql(
name = "lastActive",
desc = "List only sessions with a last active time is between the given bounds."
)]
last_active: Option<DateFilter>,
#[graphql(
name = "browserSession",
desc = "List only sessions for the given session."
)]
browser_session_param: Option<ID>,
#[graphql(desc = "Returns the elements in the list that come after the cursor.")]
after: Option<String>,
#[graphql(desc = "Returns the elements in the list that come before the cursor.")]
before: Option<String>,
#[graphql(desc = "Returns the first *n* elements from the list.")] first: Option<i32>,
#[graphql(desc = "Returns the last *n* elements from the list.")] last: Option<i32>,
) -> Result<Connection<Cursor, AppSession, PreloadedTotalCount>, async_graphql::Error> {
let state = ctx.state();
let requester = ctx.requester();
let mut repo = state.repository().await?;
let last_active = last_active.unwrap_or_default();
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| {
x.extract_for_types(&[NodeType::OAuth2Session, NodeType::CompatSession])
})
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| {
x.extract_for_types(&[NodeType::OAuth2Session, NodeType::CompatSession])
})
.transpose()?;
let pagination = Pagination::try_new(before_id, after_id, first, last)?;
let device_param = device_param.map(Device::try_from).transpose()?;
let filter = AppSessionFilter::new().for_user(&self.0);
let filter = match state_param {
Some(SessionState::Active) => filter.active_only(),
Some(SessionState::Finished) => filter.finished_only(),
None => filter,
};
let filter = match device_param.as_ref() {
Some(device) => filter.for_device(device),
None => filter,
};
let maybe_session = match browser_session_param {
Some(id) => {
let id = NodeType::BrowserSession
.extract_ulid(&id)
.context("Invalid browser_session parameter")?;
let Some(session) = repo
.browser_session()
.lookup(id)
.await?
.filter(|u| requester.is_owner_or_admin(u))
else {
return Ok(Connection::with_additional_fields(
false,
false,
PreloadedTotalCount(Some(0)),
));
};
Some(session)
}
None => None,
};
let filter = match maybe_session {
Some(ref session) => filter.for_browser_session(session),
None => filter,
};
let filter = match last_active.after {
Some(after) => filter.with_last_active_after(after),
None => filter,
};
let filter = match last_active.before {
Some(before) => filter.with_last_active_before(before),
None => filter,
};
let page = repo.app_session().list(filter, pagination).await?;
let count = if ctx.look_ahead().field("totalCount").exists() {
Some(repo.app_session().count(filter).await?)
} else {
None
};
repo.cancel().await?;
let mut connection = Connection::with_additional_fields(
page.has_previous_page,
page.has_next_page,
PreloadedTotalCount(count),
);
connection
.edges
.extend(page.edges.into_iter().map(|s| match s {
mas_storage::app_session::AppSession::Compat(session) => Edge::new(
OpaqueCursor(NodeCursor(NodeType::CompatSession, session.id)),
AppSession::CompatSession(Box::new(CompatSession::new(*session))),
),
mas_storage::app_session::AppSession::OAuth2(session) => Edge::new(
OpaqueCursor(NodeCursor(NodeType::OAuth2Session, session.id)),
AppSession::OAuth2Session(Box::new(OAuth2Session(*session))),
),
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
}
#[derive(Union)]
pub enum AppSession {
CompatSession(Box<CompatSession>),
OAuth2Session(Box<OAuth2Session>),
}
#[derive(Description)]
pub struct UserEmail(pub mas_data_model::UserEmail);
#[Object(use_type_description)]
impl UserEmail {
pub async fn id(&self) -> ID {
NodeType::UserEmail.id(self.0.id)
}
async fn email(&self) -> &str {
&self.0.email
}
pub async fn created_at(&self) -> DateTime<Utc> {
self.0.created_at
}
async fn confirmed_at(&self) -> Option<DateTime<Utc>> {
self.0.confirmed_at
}
}
#[derive(Enum, Copy, Clone, Eq, PartialEq)]
pub enum UserEmailState {
Pending,
Confirmed,
}