1use std::future::Future;
16
17use eyeball::Subscriber;
18use indexmap::IndexMap;
19#[cfg(test)]
20use matrix_sdk::crypto::{DecryptionSettings, RoomEventDecryptionResult, TrustRequirement};
21use matrix_sdk::{
22 crypto::types::events::CryptoContextInfo, deserialized_responses::TimelineEvent,
23 event_cache::paginator::PaginableRoom, AsyncTraitDeps, Result, Room, SendOutsideWasm,
24};
25use matrix_sdk_base::{latest_event::LatestEvent, RoomInfo};
26use ruma::{
27 events::{
28 fully_read::FullyReadEventContent,
29 receipt::{Receipt, ReceiptThread, ReceiptType},
30 AnyMessageLikeEventContent, AnySyncTimelineEvent,
31 },
32 push::{PushConditionRoomCtx, Ruleset},
33 serde::Raw,
34 EventId, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId, UserId,
35};
36use tracing::{debug, error};
37
38use super::{Profile, RedactError, TimelineBuilder};
39use crate::timeline::{self, pinned_events_loader::PinnedEventsRoom, Timeline};
40
41pub trait RoomExt {
42 fn timeline(&self)
50 -> impl Future<Output = Result<Timeline, timeline::Error>> + SendOutsideWasm;
51
52 fn timeline_builder(&self) -> TimelineBuilder;
61}
62
63impl RoomExt for Room {
64 async fn timeline(&self) -> Result<Timeline, timeline::Error> {
65 self.timeline_builder().build().await
66 }
67
68 fn timeline_builder(&self) -> TimelineBuilder {
69 Timeline::builder(self).track_read_marker_and_receipts()
70 }
71}
72
73pub(super) trait RoomDataProvider:
74 Clone + PaginableRoom + PinnedEventsRoom + 'static
75{
76 fn own_user_id(&self) -> &UserId;
77 fn room_version(&self) -> RoomVersionId;
78
79 fn crypto_context_info(&self)
80 -> impl Future<Output = CryptoContextInfo> + SendOutsideWasm + '_;
81
82 fn profile_from_user_id<'a>(
83 &'a self,
84 user_id: &'a UserId,
85 ) -> impl Future<Output = Option<Profile>> + SendOutsideWasm + 'a;
86 fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile>;
87
88 fn load_user_receipt<'a>(
90 &'a self,
91 receipt_type: ReceiptType,
92 thread: ReceiptThread,
93 user_id: &'a UserId,
94 ) -> impl Future<Output = Option<(OwnedEventId, Receipt)>> + SendOutsideWasm + 'a;
95
96 fn load_event_receipts<'a>(
98 &'a self,
99 event_id: &'a EventId,
100 ) -> impl Future<Output = IndexMap<OwnedUserId, Receipt>> + SendOutsideWasm + 'a;
101
102 fn load_fully_read_marker(&self) -> impl Future<Output = Option<OwnedEventId>> + '_;
104
105 fn push_rules_and_context(
106 &self,
107 ) -> impl Future<Output = Option<(Ruleset, PushConditionRoomCtx)>> + SendOutsideWasm + '_;
108
109 fn send(
111 &self,
112 content: AnyMessageLikeEventContent,
113 ) -> impl Future<Output = Result<(), super::Error>> + SendOutsideWasm + '_;
114
115 fn redact<'a>(
117 &'a self,
118 event_id: &'a EventId,
119 reason: Option<&'a str>,
120 transaction_id: Option<OwnedTransactionId>,
121 ) -> impl Future<Output = Result<(), super::Error>> + SendOutsideWasm + 'a;
122
123 fn room_info(&self) -> Subscriber<RoomInfo>;
124}
125
126impl RoomDataProvider for Room {
127 fn own_user_id(&self) -> &UserId {
128 (**self).own_user_id()
129 }
130
131 fn room_version(&self) -> RoomVersionId {
132 (**self).clone_info().room_version_or_default()
133 }
134
135 async fn crypto_context_info(&self) -> CryptoContextInfo {
136 self.crypto_context_info().await
137 }
138
139 async fn profile_from_user_id<'a>(&'a self, user_id: &'a UserId) -> Option<Profile> {
140 match self.get_member_no_sync(user_id).await {
141 Ok(Some(member)) => Some(Profile {
142 display_name: member.display_name().map(ToOwned::to_owned),
143 display_name_ambiguous: member.name_ambiguous(),
144 avatar_url: member.avatar_url().map(ToOwned::to_owned),
145 }),
146 Ok(None) if self.are_members_synced() => Some(Profile::default()),
147 Ok(None) => None,
148 Err(e) => {
149 error!(%user_id, "Failed to fetch room member information: {e}");
150 None
151 }
152 }
153 }
154
155 fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile> {
156 if !latest_event.has_sender_profile() {
157 return None;
158 }
159
160 Some(Profile {
161 display_name: latest_event.sender_display_name().map(ToOwned::to_owned),
162 display_name_ambiguous: latest_event.sender_name_ambiguous().unwrap_or(false),
163 avatar_url: latest_event.sender_avatar_url().map(ToOwned::to_owned),
164 })
165 }
166
167 async fn load_user_receipt<'a>(
168 &'a self,
169 receipt_type: ReceiptType,
170 thread: ReceiptThread,
171 user_id: &'a UserId,
172 ) -> Option<(OwnedEventId, Receipt)> {
173 match self.load_user_receipt(receipt_type.clone(), thread.clone(), user_id).await {
174 Ok(receipt) => receipt,
175 Err(e) => {
176 error!(
177 ?receipt_type,
178 ?thread,
179 ?user_id,
180 "Failed to get read receipt for user: {e}"
181 );
182 None
183 }
184 }
185 }
186
187 async fn load_event_receipts<'a>(
188 &'a self,
189 event_id: &'a EventId,
190 ) -> IndexMap<OwnedUserId, Receipt> {
191 let mut unthreaded_receipts = match self
192 .load_event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id)
193 .await
194 {
195 Ok(receipts) => receipts.into_iter().collect(),
196 Err(e) => {
197 error!(?event_id, "Failed to get unthreaded read receipts for event: {e}");
198 IndexMap::new()
199 }
200 };
201
202 let main_thread_receipts = match self
203 .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)
204 .await
205 {
206 Ok(receipts) => receipts,
207 Err(e) => {
208 error!(?event_id, "Failed to get main thread read receipts for event: {e}");
209 Vec::new()
210 }
211 };
212
213 unthreaded_receipts.extend(main_thread_receipts);
214 unthreaded_receipts
215 }
216
217 async fn push_rules_and_context(&self) -> Option<(Ruleset, PushConditionRoomCtx)> {
218 match self.push_context().await {
219 Ok(Some(push_context)) => match self.client().account().push_rules().await {
220 Ok(push_rules) => Some((push_rules, push_context)),
221 Err(e) => {
222 error!("Could not get push rules: {e}");
223 None
224 }
225 },
226 Ok(None) => {
227 debug!("Could not aggregate push context");
228 None
229 }
230 Err(e) => {
231 error!("Could not get push context: {e}");
232 None
233 }
234 }
235 }
236
237 async fn load_fully_read_marker(&self) -> Option<OwnedEventId> {
238 match self.account_data_static::<FullyReadEventContent>().await {
239 Ok(Some(fully_read)) => match fully_read.deserialize() {
240 Ok(fully_read) => Some(fully_read.content.event_id),
241 Err(e) => {
242 error!("Failed to deserialize fully-read account data: {e}");
243 None
244 }
245 },
246 Err(e) => {
247 error!("Failed to get fully-read account data from the store: {e}");
248 None
249 }
250 _ => None,
251 }
252 }
253
254 async fn send(&self, content: AnyMessageLikeEventContent) -> Result<(), super::Error> {
255 let _ = self.send_queue().send(content).await?;
256 Ok(())
257 }
258
259 async fn redact<'a>(
260 &'a self,
261 event_id: &'a EventId,
262 reason: Option<&'a str>,
263 transaction_id: Option<OwnedTransactionId>,
264 ) -> Result<(), super::Error> {
265 let _ = self
266 .redact(event_id, reason, transaction_id)
267 .await
268 .map_err(RedactError::HttpError)
269 .map_err(super::Error::RedactError)?;
270 Ok(())
271 }
272
273 fn room_info(&self) -> Subscriber<RoomInfo> {
274 self.subscribe_info()
275 }
276}
277
278pub(super) trait Decryptor: AsyncTraitDeps + Clone + 'static {
281 fn decrypt_event_impl(
282 &self,
283 raw: &Raw<AnySyncTimelineEvent>,
284 ) -> impl Future<Output = Result<TimelineEvent>> + SendOutsideWasm;
285}
286
287impl Decryptor for Room {
288 async fn decrypt_event_impl(&self, raw: &Raw<AnySyncTimelineEvent>) -> Result<TimelineEvent> {
289 self.decrypt_event(raw.cast_ref()).await
290 }
291}
292
293#[cfg(test)]
294impl Decryptor for (matrix_sdk_base::crypto::OlmMachine, ruma::OwnedRoomId) {
295 async fn decrypt_event_impl(&self, raw: &Raw<AnySyncTimelineEvent>) -> Result<TimelineEvent> {
296 let (olm_machine, room_id) = self;
297 let decryption_settings =
298 DecryptionSettings { sender_device_trust_requirement: TrustRequirement::Untrusted };
299 match olm_machine
300 .try_decrypt_room_event(raw.cast_ref(), room_id, &decryption_settings)
301 .await?
302 {
303 RoomEventDecryptionResult::Decrypted(decrypted) => Ok(decrypted.into()),
304 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
305 Ok(TimelineEvent::new_utd_event(raw.clone(), utd_info))
306 }
307 }
308 }
309}