1use std::{
16 collections::{BTreeMap, BTreeSet},
17 sync::Arc,
18};
19
20use futures_core::Stream;
21use futures_util::{pin_mut, StreamExt};
22use matrix_sdk::{
23 crypto::store::RoomKeyInfo,
24 encryption::backups::BackupState,
25 event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheListener, RoomEventCacheUpdate},
26 executor::spawn,
27 send_queue::RoomSendQueueUpdate,
28 Room,
29};
30use ruma::{events::AnySyncTimelineEvent, OwnedEventId, RoomVersionId};
31use tokio::sync::broadcast::{error::RecvError, Receiver};
32use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
33use tracing::{info_span, trace, warn, Instrument, Span};
34
35use super::{
36 controller::{TimelineController, TimelineSettings},
37 to_device::{handle_forwarded_room_key_event, handle_room_key_event},
38 DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
39};
40use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
41
42#[must_use]
45#[derive(Debug)]
46pub struct TimelineBuilder {
47 room: Room,
48 settings: TimelineSettings,
49 focus: TimelineFocus,
50
51 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
54
55 internal_id_prefix: Option<String>,
57}
58
59impl TimelineBuilder {
60 pub(super) fn new(room: &Room) -> Self {
61 Self {
62 room: room.clone(),
63 settings: TimelineSettings::default(),
64 unable_to_decrypt_hook: None,
65 focus: TimelineFocus::Live,
66 internal_id_prefix: None,
67 }
68 }
69
70 pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
74 self.focus = focus;
75 self
76 }
77
78 pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
83 self.unable_to_decrypt_hook = Some(hook);
84 self
85 }
86
87 pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
92 self.internal_id_prefix = Some(prefix);
93 self
94 }
95
96 pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
99 self.settings.date_divider_mode = mode;
100 self
101 }
102
103 pub fn track_read_marker_and_receipts(mut self) -> Self {
106 self.settings.track_read_receipts = true;
107 self
108 }
109
110 pub fn event_filter<F>(mut self, filter: F) -> Self
136 where
137 F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync + 'static,
138 {
139 self.settings.event_filter = Arc::new(filter);
140 self
141 }
142
143 pub fn add_failed_to_parse(mut self, add: bool) -> Self {
147 self.settings.add_failed_to_parse = add;
148 self
149 }
150
151 #[tracing::instrument(
153 skip(self),
154 fields(
155 room_id = ?self.room.room_id(),
156 track_read_receipts = self.settings.track_read_receipts,
157 )
158 )]
159 pub async fn build(self) -> Result<Timeline, Error> {
160 let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
161
162 let client = room.client();
163 let event_cache = client.event_cache();
164
165 event_cache.subscribe()?;
167
168 let (room_event_cache, event_cache_drop) = room.event_cache().await?;
169 let (_, event_subscriber) = room_event_cache.subscribe().await;
170
171 let is_live = matches!(focus, TimelineFocus::Live);
172 let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. });
173 let is_room_encrypted = room
174 .latest_encryption_state()
175 .await
176 .map(|state| state.is_encrypted())
177 .ok()
178 .unwrap_or_default();
179
180 let controller = TimelineController::new(
181 room.clone(),
182 focus.clone(),
183 internal_id_prefix.clone(),
184 unable_to_decrypt_hook,
185 is_room_encrypted,
186 )
187 .with_settings(settings);
188
189 let has_events = controller.init_focus(&room_event_cache).await?;
190
191 let pinned_events_join_handle = if is_pinned_events {
192 Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone())))
193 } else {
194 None
195 };
196
197 let encryption_changes_handle = spawn({
198 let inner = controller.clone();
199 async move {
200 inner.handle_encryption_state_changes().await;
201 }
202 });
203
204 let room_update_join_handle = spawn({
205 let span = info_span!(
206 parent: Span::none(),
207 "live_update_handler",
208 room_id = ?room.room_id(),
209 focus = focus.debug_string(),
210 prefix = internal_id_prefix
211 );
212 span.follows_from(Span::current());
213
214 room_event_cache_updates_task(
215 room_event_cache.clone(),
216 controller.clone(),
217 event_subscriber,
218 is_live,
219 )
220 .instrument(span)
221 });
222
223 let local_echo_listener_handle = {
224 let timeline_controller = controller.clone();
225 let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
226
227 spawn({
228 for echo in local_echoes {
230 timeline_controller.handle_local_echo(echo).await;
231 }
232
233 let span = info_span!(
234 parent: Span::none(),
235 "local_echo_handler",
236 room_id = ?room.room_id(),
237 focus = focus.debug_string(),
238 prefix = internal_id_prefix
239 );
240 span.follows_from(Span::current());
241
242 room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
243 })
244 };
245
246 let room_key_handle = client.add_event_handler(handle_room_key_event(
247 controller.clone(),
248 room.room_id().to_owned(),
249 ));
250
251 let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event(
252 controller.clone(),
253 room.room_id().to_owned(),
254 ));
255
256 let event_handlers = vec![room_key_handle, forwarded_room_key_handle];
257
258 let room_key_from_backups_join_handle = spawn(room_keys_from_backups_task(
262 client.encryption().backups().room_keys_for_room_stream(controller.room().room_id()),
263 controller.clone(),
264 ));
265
266 let room_key_backup_enabled_join_handle = spawn(backup_states_task(
267 client.encryption().backups().state_stream(),
268 controller.clone(),
269 ));
270
271 let room_keys_received_join_handle = {
277 spawn(room_key_received_task(
278 client.encryption().room_keys_received_stream().await.expect(
279 "We should be logged in by now, so we should have access to an `OlmMachine` \
280 to be able to listen to this stream",
281 ),
282 controller.clone(),
283 ))
284 };
285
286 let timeline = Timeline {
287 controller,
288 event_cache: room_event_cache,
289 drop_handle: Arc::new(TimelineDropHandle {
290 client,
291 event_handler_handles: event_handlers,
292 room_update_join_handle,
293 pinned_events_join_handle,
294 room_key_from_backups_join_handle,
295 room_key_backup_enabled_join_handle,
296 room_keys_received_join_handle,
297 local_echo_listener_handle,
298 _event_cache_drop_handle: event_cache_drop,
299 encryption_changes_handle,
300 }),
301 };
302
303 if has_events {
304 timeline.retry_decryption_for_all_events().await;
308 }
309
310 Ok(timeline)
311 }
312}
313
314async fn pinned_events_task<S>(pinned_event_ids_stream: S, timeline_controller: TimelineController)
316where
317 S: Stream<Item = Vec<OwnedEventId>>,
318{
319 pin_mut!(pinned_event_ids_stream);
320
321 while pinned_event_ids_stream.next().await.is_some() {
322 if let Ok(events) = timeline_controller.reload_pinned_events().await {
323 timeline_controller
324 .replace_with_initial_remote_events(
325 events.into_iter(),
326 RemoteEventOrigin::Pagination,
327 )
328 .await;
329 }
330 }
331}
332
333async fn room_event_cache_updates_task(
335 room_event_cache: RoomEventCache,
336 timeline_controller: TimelineController,
337 mut event_subscriber: RoomEventCacheListener,
338 is_live: bool,
339) {
340 trace!("Spawned the event subscriber task.");
341
342 loop {
343 trace!("Waiting for an event.");
344
345 let update = match event_subscriber.recv().await {
346 Ok(up) => up,
347 Err(RecvError::Closed) => break,
348 Err(RecvError::Lagged(num_skipped)) => {
349 warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
350
351 let (initial_events, _stream) = room_event_cache.subscribe().await;
355
356 timeline_controller
357 .replace_with_initial_remote_events(
358 initial_events.into_iter(),
359 RemoteEventOrigin::Cache,
360 )
361 .await;
362
363 continue;
364 }
365 };
366
367 match update {
368 RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
369 trace!(target = %event_id, "Handling fully read marker.");
370 timeline_controller.handle_fully_read_marker(event_id).await;
371 }
372
373 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
374 trace!("Received new timeline events diffs");
375
376 if !is_live {
380 continue;
381 }
382
383 timeline_controller
384 .handle_remote_events_with_diffs(
385 diffs,
386 match origin {
387 EventsOrigin::Sync => RemoteEventOrigin::Sync,
388 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
389 EventsOrigin::Cache => RemoteEventOrigin::Cache,
390 },
391 )
392 .await;
393
394 if matches!(origin, EventsOrigin::Cache) {
395 timeline_controller.retry_event_decryption(None).await;
396 }
397 }
398
399 RoomEventCacheUpdate::AddEphemeralEvents { events } => {
400 trace!("Received new ephemeral events from sync.");
401
402 timeline_controller.handle_ephemeral_events(events).await;
404 }
405
406 RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
407 if !ambiguity_changes.is_empty() {
408 let member_ambiguity_changes = ambiguity_changes
409 .values()
410 .flat_map(|change| change.user_ids())
411 .collect::<BTreeSet<_>>();
412 timeline_controller
413 .force_update_sender_profiles(&member_ambiguity_changes)
414 .await;
415 }
416 }
417 }
418 }
419}
420
421async fn room_send_queue_update_task(
423 mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
424 timeline_controller: TimelineController,
425) {
426 trace!("spawned the local echo task!");
427
428 loop {
429 match send_queue_stream.recv().await {
430 Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
431
432 Err(RecvError::Lagged(num_missed)) => {
433 warn!("missed {num_missed} local echoes, ignoring those missed");
434 }
435
436 Err(RecvError::Closed) => {
437 trace!("channel closed, exiting the local echo handler");
438 break;
439 }
440 }
441 }
442}
443
444async fn room_keys_from_backups_task<S>(stream: S, timeline_controller: TimelineController)
446where
447 S: Stream<Item = Result<BTreeMap<String, BTreeSet<String>>, BroadcastStreamRecvError>>,
448{
449 pin_mut!(stream);
450
451 while let Some(update) = stream.next().await {
452 match update {
453 Ok(info) => {
454 let mut session_ids = BTreeSet::new();
455
456 for set in info.into_values() {
457 session_ids.extend(set);
458 }
459
460 timeline_controller.retry_event_decryption(Some(session_ids)).await;
461 }
462 Err(_) => timeline_controller.retry_event_decryption(None).await,
464 }
465 }
466}
467
468async fn backup_states_task<S>(backup_states_stream: S, timeline_controller: TimelineController)
470where
471 S: Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
472{
473 pin_mut!(backup_states_stream);
474
475 while let Some(update) = backup_states_stream.next().await {
476 match update {
477 Ok(BackupState::Enabled) | Err(_) => {
486 timeline_controller.retry_event_decryption(None).await;
487 }
488 Ok(
491 BackupState::Unknown
492 | BackupState::Creating
493 | BackupState::Resuming
494 | BackupState::Disabling
495 | BackupState::Downloading
496 | BackupState::Enabling,
497 ) => (),
498 }
499 }
500}
501
502async fn room_key_received_task<S>(
504 room_keys_received_stream: S,
505 timeline_controller: TimelineController,
506) where
507 S: Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>,
508{
509 pin_mut!(room_keys_received_stream);
510
511 let room_id = timeline_controller.room().room_id();
512
513 while let Some(room_keys) = room_keys_received_stream.next().await {
514 let session_ids = match room_keys {
515 Ok(room_keys) => {
516 let session_ids: BTreeSet<String> = room_keys
517 .into_iter()
518 .filter(|info| info.room_id == room_id)
519 .map(|info| info.session_id)
520 .collect();
521
522 Some(session_ids)
523 }
524 Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
525 warn!(
528 missed_updates,
529 "The room keys stream has lagged, retrying to decrypt the whole timeline"
530 );
531
532 None
533 }
534 };
535
536 timeline_controller.retry_event_decryption(session_ids).await;
537 }
538}