1use std::{future::Future, sync::Arc};
16
17use eyeball::Subscriber;
18use indexmap::IndexMap;
19use matrix_sdk::{
20 AsyncTraitDeps, Result, Room, SendOutsideWasm,
21 crypto::types::events::CryptoContextInfo,
22 deserialized_responses::{EncryptionInfo, TimelineEvent},
23 paginators::{PaginableRoom, thread::PaginableThread},
24 room::PushContext,
25};
26use matrix_sdk_base::{RoomInfo, latest_event::LatestEvent};
27use ruma::{
28 EventId, OwnedEventId, OwnedTransactionId, OwnedUserId, UserId,
29 events::{
30 AnyMessageLikeEventContent, AnySyncTimelineEvent,
31 fully_read::FullyReadEventContent,
32 receipt::{Receipt, ReceiptThread, ReceiptType},
33 room::encrypted::OriginalSyncRoomEncryptedEvent,
34 },
35 room_version_rules::RoomVersionRules,
36 serde::Raw,
37};
38use tracing::error;
39
40use super::{EventTimelineItem, Profile, RedactError, TimelineBuilder};
41use crate::timeline::{
42 self, Timeline, latest_event::LatestEventValue, pinned_events_loader::PinnedEventsRoom,
43};
44
45pub trait RoomExt {
46 fn timeline(&self)
54 -> impl Future<Output = Result<Timeline, timeline::Error>> + SendOutsideWasm;
55
56 fn timeline_builder(&self) -> TimelineBuilder;
65
66 fn latest_event_item(
69 &self,
70 ) -> impl Future<Output = Option<EventTimelineItem>> + SendOutsideWasm;
71
72 fn new_latest_event(&self) -> impl Future<Output = LatestEventValue>;
74}
75
76impl RoomExt for Room {
77 async fn timeline(&self) -> Result<Timeline, timeline::Error> {
78 self.timeline_builder().build().await
79 }
80
81 fn timeline_builder(&self) -> TimelineBuilder {
82 TimelineBuilder::new(self).track_read_marker_and_receipts()
83 }
84
85 async fn latest_event_item(&self) -> Option<EventTimelineItem> {
86 if let Some(latest_event) = self.latest_event() {
87 EventTimelineItem::from_latest_event(self.client(), self.room_id(), latest_event).await
88 } else {
89 None
90 }
91 }
92
93 async fn new_latest_event(&self) -> LatestEventValue {
94 LatestEventValue::from_base_latest_event_value(
95 (**self).new_latest_event(),
96 self,
97 &self.client(),
98 )
99 .await
100 }
101}
102
103pub(super) trait RoomDataProvider:
104 Clone + Decryptor + PaginableRoom + PaginableThread + PinnedEventsRoom + 'static
105{
106 fn own_user_id(&self) -> &UserId;
107 fn room_version_rules(&self) -> RoomVersionRules;
108
109 fn crypto_context_info(&self)
110 -> impl Future<Output = CryptoContextInfo> + SendOutsideWasm + '_;
111
112 fn profile_from_user_id<'a>(
113 &'a self,
114 user_id: &'a UserId,
115 ) -> impl Future<Output = Option<Profile>> + SendOutsideWasm + 'a;
116 fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile>;
117
118 fn load_user_receipt<'a>(
120 &'a self,
121 receipt_type: ReceiptType,
122 thread: ReceiptThread,
123 user_id: &'a UserId,
124 ) -> impl Future<Output = Option<(OwnedEventId, Receipt)>> + SendOutsideWasm + 'a;
125
126 fn load_event_receipts<'a>(
128 &'a self,
129 event_id: &'a EventId,
130 receipt_thread: ReceiptThread,
131 ) -> impl Future<Output = IndexMap<OwnedUserId, Receipt>> + SendOutsideWasm + 'a;
132
133 fn load_fully_read_marker(&self) -> impl Future<Output = Option<OwnedEventId>> + '_;
135
136 fn push_context(&self) -> impl Future<Output = Option<PushContext>> + SendOutsideWasm + '_;
137
138 fn send(
140 &self,
141 content: AnyMessageLikeEventContent,
142 ) -> impl Future<Output = Result<(), super::Error>> + SendOutsideWasm + '_;
143
144 fn redact<'a>(
146 &'a self,
147 event_id: &'a EventId,
148 reason: Option<&'a str>,
149 transaction_id: Option<OwnedTransactionId>,
150 ) -> impl Future<Output = Result<(), super::Error>> + SendOutsideWasm + 'a;
151
152 fn room_info(&self) -> Subscriber<RoomInfo>;
153
154 fn get_encryption_info(
157 &self,
158 session_id: &str,
159 sender: &UserId,
160 ) -> impl Future<Output = Option<Arc<EncryptionInfo>>> + SendOutsideWasm;
161
162 fn load_event<'a>(
164 &'a self,
165 event_id: &'a EventId,
166 ) -> impl Future<Output = Result<TimelineEvent>> + SendOutsideWasm + 'a;
167}
168
169impl RoomDataProvider for Room {
170 fn own_user_id(&self) -> &UserId {
171 (**self).own_user_id()
172 }
173
174 fn room_version_rules(&self) -> RoomVersionRules {
175 (**self).clone_info().room_version_rules_or_default()
176 }
177
178 async fn crypto_context_info(&self) -> CryptoContextInfo {
179 self.crypto_context_info().await
180 }
181
182 async fn profile_from_user_id<'a>(&'a self, user_id: &'a UserId) -> Option<Profile> {
183 match self.get_member_no_sync(user_id).await {
184 Ok(Some(member)) => Some(Profile {
185 display_name: member.display_name().map(ToOwned::to_owned),
186 display_name_ambiguous: member.name_ambiguous(),
187 avatar_url: member.avatar_url().map(ToOwned::to_owned),
188 }),
189 Ok(None) if self.are_members_synced() => Some(Profile::default()),
190 Ok(None) => None,
191 Err(e) => {
192 error!(%user_id, "Failed to fetch room member information: {e}");
193 None
194 }
195 }
196 }
197
198 fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile> {
199 if !latest_event.has_sender_profile() {
200 return None;
201 }
202
203 Some(Profile {
204 display_name: latest_event.sender_display_name().map(ToOwned::to_owned),
205 display_name_ambiguous: latest_event.sender_name_ambiguous().unwrap_or(false),
206 avatar_url: latest_event.sender_avatar_url().map(ToOwned::to_owned),
207 })
208 }
209
210 async fn load_user_receipt<'a>(
211 &'a self,
212 receipt_type: ReceiptType,
213 thread: ReceiptThread,
214 user_id: &'a UserId,
215 ) -> Option<(OwnedEventId, Receipt)> {
216 match self.load_user_receipt(receipt_type.clone(), thread.clone(), user_id).await {
217 Ok(receipt) => receipt,
218 Err(e) => {
219 error!(
220 ?receipt_type,
221 ?thread,
222 ?user_id,
223 "Failed to get read receipt for user: {e}"
224 );
225 None
226 }
227 }
228 }
229
230 async fn load_event_receipts<'a>(
231 &'a self,
232 event_id: &'a EventId,
233 receipt_thread: ReceiptThread,
234 ) -> IndexMap<OwnedUserId, Receipt> {
235 let mut result = match self
236 .load_event_receipts(ReceiptType::Read, receipt_thread.clone(), event_id)
237 .await
238 {
239 Ok(receipts) => receipts.into_iter().collect(),
240 Err(e) => {
241 error!(?event_id, ?receipt_thread, "Failed to get read receipts for event: {e}");
242 IndexMap::new()
243 }
244 };
245
246 if receipt_thread == ReceiptThread::Unthreaded {
247 let main_thread_receipts = match self
250 .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)
251 .await
252 {
253 Ok(receipts) => receipts,
254 Err(e) => {
255 error!(?event_id, "Failed to get main thread read receipts for event: {e}");
256 Vec::new()
257 }
258 };
259 result.extend(main_thread_receipts);
260 }
261
262 result
263 }
264
265 async fn push_context(&self) -> Option<PushContext> {
266 self.push_context().await.ok().flatten()
267 }
268
269 async fn load_fully_read_marker(&self) -> Option<OwnedEventId> {
270 match self.account_data_static::<FullyReadEventContent>().await {
271 Ok(Some(fully_read)) => match fully_read.deserialize() {
272 Ok(fully_read) => Some(fully_read.content.event_id),
273 Err(e) => {
274 error!("Failed to deserialize fully-read account data: {e}");
275 None
276 }
277 },
278 Err(e) => {
279 error!("Failed to get fully-read account data from the store: {e}");
280 None
281 }
282 _ => None,
283 }
284 }
285
286 async fn send(&self, content: AnyMessageLikeEventContent) -> Result<(), super::Error> {
287 let _ = self.send_queue().send(content).await?;
288 Ok(())
289 }
290
291 async fn redact<'a>(
292 &'a self,
293 event_id: &'a EventId,
294 reason: Option<&'a str>,
295 transaction_id: Option<OwnedTransactionId>,
296 ) -> Result<(), super::Error> {
297 let _ = self
298 .redact(event_id, reason, transaction_id)
299 .await
300 .map_err(RedactError::HttpError)
301 .map_err(super::Error::RedactError)?;
302 Ok(())
303 }
304
305 fn room_info(&self) -> Subscriber<RoomInfo> {
306 self.subscribe_info()
307 }
308
309 async fn get_encryption_info(
310 &self,
311 session_id: &str,
312 sender: &UserId,
313 ) -> Option<Arc<EncryptionInfo>> {
314 self.get_encryption_info(session_id, sender).await
316 }
317
318 async fn load_event<'a>(&'a self, event_id: &'a EventId) -> Result<TimelineEvent> {
319 self.load_or_fetch_event(event_id, None).await
320 }
321}
322
323pub(crate) trait Decryptor: AsyncTraitDeps + Clone + 'static {
326 fn decrypt_event_impl(
327 &self,
328 raw: &Raw<AnySyncTimelineEvent>,
329 push_ctx: Option<&PushContext>,
330 ) -> impl Future<Output = Result<TimelineEvent>> + SendOutsideWasm;
331}
332
333impl Decryptor for Room {
334 async fn decrypt_event_impl(
335 &self,
336 raw: &Raw<AnySyncTimelineEvent>,
337 push_ctx: Option<&PushContext>,
338 ) -> Result<TimelineEvent> {
339 self.decrypt_event(raw.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx)
343 .await
344 }
345}