use std::future::Future;
use eyeball::Subscriber;
use futures_util::FutureExt as _;
use indexmap::IndexMap;
#[cfg(test)]
use matrix_sdk::crypto::{DecryptionSettings, RoomEventDecryptionResult, TrustRequirement};
use matrix_sdk::{
crypto::types::events::CryptoContextInfo, deserialized_responses::TimelineEvent,
event_cache::paginator::PaginableRoom, BoxFuture, Result, Room,
};
use matrix_sdk_base::{latest_event::LatestEvent, RoomInfo};
use ruma::{
events::{
fully_read::FullyReadEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
AnyMessageLikeEventContent, AnySyncTimelineEvent,
},
push::{PushConditionRoomCtx, Ruleset},
serde::Raw,
EventId, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId, UserId,
};
use tracing::{debug, error};
use super::{Profile, RedactError, TimelineBuilder};
use crate::timeline::{self, pinned_events_loader::PinnedEventsRoom, Timeline};
pub trait RoomExt {
fn timeline(&self) -> impl Future<Output = Result<Timeline, timeline::Error>> + Send;
fn timeline_builder(&self) -> TimelineBuilder;
}
impl RoomExt for Room {
async fn timeline(&self) -> Result<Timeline, timeline::Error> {
self.timeline_builder().build().await
}
fn timeline_builder(&self) -> TimelineBuilder {
Timeline::builder(self).track_read_marker_and_receipts()
}
}
pub(super) trait RoomDataProvider:
Clone + Send + Sync + 'static + PaginableRoom + PinnedEventsRoom
{
fn own_user_id(&self) -> &UserId;
fn room_version(&self) -> RoomVersionId;
fn crypto_context_info(&self) -> BoxFuture<'_, CryptoContextInfo>;
fn profile_from_user_id<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<Profile>>;
fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile>;
fn load_user_receipt<'a>(
&'a self,
receipt_type: ReceiptType,
thread: ReceiptThread,
user_id: &'a UserId,
) -> BoxFuture<'a, Option<(OwnedEventId, Receipt)>>;
fn load_event_receipts<'a>(
&'a self,
event_id: &'a EventId,
) -> BoxFuture<'a, IndexMap<OwnedUserId, Receipt>>;
fn load_fully_read_marker(&self) -> BoxFuture<'_, Option<OwnedEventId>>;
fn push_rules_and_context(&self) -> BoxFuture<'_, Option<(Ruleset, PushConditionRoomCtx)>>;
fn send(&self, content: AnyMessageLikeEventContent) -> BoxFuture<'_, Result<(), super::Error>>;
fn redact<'a>(
&'a self,
event_id: &'a EventId,
reason: Option<&'a str>,
transaction_id: Option<OwnedTransactionId>,
) -> BoxFuture<'a, Result<(), super::Error>>;
fn room_info(&self) -> Subscriber<RoomInfo>;
}
impl RoomDataProvider for Room {
fn own_user_id(&self) -> &UserId {
(**self).own_user_id()
}
fn room_version(&self) -> RoomVersionId {
(**self).clone_info().room_version_or_default()
}
fn crypto_context_info(&self) -> BoxFuture<'_, CryptoContextInfo> {
async move { self.crypto_context_info().await }.boxed()
}
fn profile_from_user_id<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<Profile>> {
async move {
match self.get_member_no_sync(user_id).await {
Ok(Some(member)) => Some(Profile {
display_name: member.display_name().map(ToOwned::to_owned),
display_name_ambiguous: member.name_ambiguous(),
avatar_url: member.avatar_url().map(ToOwned::to_owned),
}),
Ok(None) if self.are_members_synced() => Some(Profile::default()),
Ok(None) => None,
Err(e) => {
error!(%user_id, "Failed to fetch room member information: {e}");
None
}
}
}
.boxed()
}
fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile> {
if !latest_event.has_sender_profile() {
return None;
}
Some(Profile {
display_name: latest_event.sender_display_name().map(ToOwned::to_owned),
display_name_ambiguous: latest_event.sender_name_ambiguous().unwrap_or(false),
avatar_url: latest_event.sender_avatar_url().map(ToOwned::to_owned),
})
}
fn load_user_receipt<'a>(
&'a self,
receipt_type: ReceiptType,
thread: ReceiptThread,
user_id: &'a UserId,
) -> BoxFuture<'a, Option<(OwnedEventId, Receipt)>> {
async move {
match self.load_user_receipt(receipt_type.clone(), thread.clone(), user_id).await {
Ok(receipt) => receipt,
Err(e) => {
error!(
?receipt_type,
?thread,
?user_id,
"Failed to get read receipt for user: {e}"
);
None
}
}
}
.boxed()
}
fn load_event_receipts<'a>(
&'a self,
event_id: &'a EventId,
) -> BoxFuture<'a, IndexMap<OwnedUserId, Receipt>> {
async move {
let mut unthreaded_receipts = match self
.load_event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id)
.await
{
Ok(receipts) => receipts.into_iter().collect(),
Err(e) => {
error!(?event_id, "Failed to get unthreaded read receipts for event: {e}");
IndexMap::new()
}
};
let main_thread_receipts = match self
.load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)
.await
{
Ok(receipts) => receipts,
Err(e) => {
error!(?event_id, "Failed to get main thread read receipts for event: {e}");
Vec::new()
}
};
unthreaded_receipts.extend(main_thread_receipts);
unthreaded_receipts
}
.boxed()
}
fn push_rules_and_context(&self) -> BoxFuture<'_, Option<(Ruleset, PushConditionRoomCtx)>> {
async {
match self.push_context().await {
Ok(Some(push_context)) => match self.client().account().push_rules().await {
Ok(push_rules) => Some((push_rules, push_context)),
Err(e) => {
error!("Could not get push rules: {e}");
None
}
},
Ok(None) => {
debug!("Could not aggregate push context");
None
}
Err(e) => {
error!("Could not get push context: {e}");
None
}
}
}
.boxed()
}
fn load_fully_read_marker(&self) -> BoxFuture<'_, Option<OwnedEventId>> {
async {
match self.account_data_static::<FullyReadEventContent>().await {
Ok(Some(fully_read)) => match fully_read.deserialize() {
Ok(fully_read) => Some(fully_read.content.event_id),
Err(e) => {
error!("Failed to deserialize fully-read account data: {e}");
None
}
},
Err(e) => {
error!("Failed to get fully-read account data from the store: {e}");
None
}
_ => None,
}
}
.boxed()
}
fn send(&self, content: AnyMessageLikeEventContent) -> BoxFuture<'_, Result<(), super::Error>> {
async move {
let _ = self.send_queue().send(content).await?;
Ok(())
}
.boxed()
}
fn redact<'a>(
&'a self,
event_id: &'a EventId,
reason: Option<&'a str>,
transaction_id: Option<OwnedTransactionId>,
) -> BoxFuture<'a, Result<(), super::Error>> {
async move {
let _ = self
.redact(event_id, reason, transaction_id)
.await
.map_err(RedactError::HttpError)
.map_err(super::Error::RedactError)?;
Ok(())
}
.boxed()
}
fn room_info(&self) -> Subscriber<RoomInfo> {
self.subscribe_info()
}
}
pub(super) trait Decryptor: Clone + Send + Sync + 'static {
fn decrypt_event_impl(
&self,
raw: &Raw<AnySyncTimelineEvent>,
) -> impl Future<Output = Result<TimelineEvent>> + Send;
}
impl Decryptor for Room {
async fn decrypt_event_impl(&self, raw: &Raw<AnySyncTimelineEvent>) -> Result<TimelineEvent> {
self.decrypt_event(raw.cast_ref()).await
}
}
#[cfg(test)]
impl Decryptor for (matrix_sdk_base::crypto::OlmMachine, ruma::OwnedRoomId) {
async fn decrypt_event_impl(&self, raw: &Raw<AnySyncTimelineEvent>) -> Result<TimelineEvent> {
let (olm_machine, room_id) = self;
let decryption_settings =
DecryptionSettings { sender_device_trust_requirement: TrustRequirement::Untrusted };
match olm_machine
.try_decrypt_room_event(raw.cast_ref(), room_id, &decryption_settings)
.await?
{
RoomEventDecryptionResult::Decrypted(decrypted) => Ok(decrypted.into()),
RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
Ok(TimelineEvent::new_utd_event(raw.clone(), utd_info))
}
}
}
}