matrix_sdk_ui/timeline/
builder.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, BTreeSet},
17    sync::Arc,
18};
19
20use futures_core::Stream;
21use futures_util::{pin_mut, StreamExt};
22use matrix_sdk::{
23    crypto::store::RoomKeyInfo,
24    encryption::backups::BackupState,
25    event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheListener, RoomEventCacheUpdate},
26    executor::spawn,
27    send_queue::RoomSendQueueUpdate,
28    Room,
29};
30use ruma::{events::AnySyncTimelineEvent, OwnedEventId, RoomVersionId};
31use tokio::sync::broadcast::{error::RecvError, Receiver};
32use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
33use tracing::{info_span, trace, warn, Instrument, Span};
34
35use super::{
36    controller::{TimelineController, TimelineSettings},
37    to_device::{handle_forwarded_room_key_event, handle_room_key_event},
38    DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
39};
40use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
41
42/// Builder that allows creating and configuring various parts of a
43/// [`Timeline`].
44#[must_use]
45#[derive(Debug)]
46pub struct TimelineBuilder {
47    room: Room,
48    settings: TimelineSettings,
49    focus: TimelineFocus,
50
51    /// An optional hook to call whenever we run into an unable-to-decrypt or a
52    /// late-decryption event.
53    unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
54
55    /// An optional prefix for internal IDs.
56    internal_id_prefix: Option<String>,
57}
58
59impl TimelineBuilder {
60    pub(super) fn new(room: &Room) -> Self {
61        Self {
62            room: room.clone(),
63            settings: TimelineSettings::default(),
64            unable_to_decrypt_hook: None,
65            focus: TimelineFocus::Live,
66            internal_id_prefix: None,
67        }
68    }
69
70    /// Sets up the initial focus for this timeline.
71    ///
72    /// This can be changed later on while the timeline is alive.
73    pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
74        self.focus = focus;
75        self
76    }
77
78    /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
79    /// we're building.
80    ///
81    /// If it was previously set before, will overwrite the previous one.
82    pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
83        self.unable_to_decrypt_hook = Some(hook);
84        self
85    }
86
87    /// Sets the internal id prefix for this timeline.
88    ///
89    /// The prefix will be prepended to any internal ID using when generating
90    /// timeline IDs for this timeline.
91    pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
92        self.internal_id_prefix = Some(prefix);
93        self
94    }
95
96    /// Chose when to insert the date separators, either in between each day
97    /// or each month.
98    pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
99        self.settings.date_divider_mode = mode;
100        self
101    }
102
103    /// Enable tracking of the fully-read marker and the read receipts on the
104    /// timeline.
105    pub fn track_read_marker_and_receipts(mut self) -> Self {
106        self.settings.track_read_receipts = true;
107        self
108    }
109
110    /// Use the given filter to choose whether to add events to the timeline.
111    ///
112    /// # Arguments
113    ///
114    /// * `filter` - A function that takes a deserialized event, and should
115    ///   return `true` if the event should be added to the `Timeline`.
116    ///
117    /// If this is not overridden, the timeline uses the default filter that
118    /// only allows events that are materialized into a `Timeline` item. For
119    /// instance, reactions and edits don't get their own timeline item (as
120    /// they affect another existing one), so they're "filtered out" to
121    /// reflect that.
122    ///
123    /// You can use the default event filter with
124    /// [`crate::timeline::default_event_filter`] so as to chain it with
125    /// your own event filter, if you want to avoid situations where a read
126    /// receipt would be attached to an event that doesn't get its own
127    /// timeline item.
128    ///
129    /// Note that currently:
130    ///
131    /// - Not all event types have a representation as a `TimelineItem` so these
132    ///   are not added no matter what the filter returns.
133    /// - It is not possible to filter out `m.room.encrypted` events (otherwise
134    ///   they couldn't be decrypted when the appropriate room key arrives).
135    pub fn event_filter<F>(mut self, filter: F) -> Self
136    where
137        F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync + 'static,
138    {
139        self.settings.event_filter = Arc::new(filter);
140        self
141    }
142
143    /// Whether to add events that failed to deserialize to the timeline.
144    ///
145    /// Defaults to `true`.
146    pub fn add_failed_to_parse(mut self, add: bool) -> Self {
147        self.settings.add_failed_to_parse = add;
148        self
149    }
150
151    /// Create a [`Timeline`] with the options set on this builder.
152    #[tracing::instrument(
153        skip(self),
154        fields(
155            room_id = ?self.room.room_id(),
156            track_read_receipts = self.settings.track_read_receipts,
157        )
158    )]
159    pub async fn build(self) -> Result<Timeline, Error> {
160        let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
161
162        let client = room.client();
163        let event_cache = client.event_cache();
164
165        // Subscribe the event cache to sync responses, in case we hadn't done it yet.
166        event_cache.subscribe()?;
167
168        let (room_event_cache, event_cache_drop) = room.event_cache().await?;
169        let (_, event_subscriber) = room_event_cache.subscribe().await;
170
171        let is_live = matches!(focus, TimelineFocus::Live);
172        let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. });
173        let is_room_encrypted = room
174            .latest_encryption_state()
175            .await
176            .map(|state| state.is_encrypted())
177            .ok()
178            .unwrap_or_default();
179
180        let controller = TimelineController::new(
181            room.clone(),
182            focus.clone(),
183            internal_id_prefix.clone(),
184            unable_to_decrypt_hook,
185            is_room_encrypted,
186        )
187        .with_settings(settings);
188
189        let has_events = controller.init_focus(&room_event_cache).await?;
190
191        let pinned_events_join_handle = if is_pinned_events {
192            Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone())))
193        } else {
194            None
195        };
196
197        let encryption_changes_handle = spawn({
198            let inner = controller.clone();
199            async move {
200                inner.handle_encryption_state_changes().await;
201            }
202        });
203
204        let room_update_join_handle = spawn({
205            let span = info_span!(
206                parent: Span::none(),
207                "live_update_handler",
208                room_id = ?room.room_id(),
209                focus = focus.debug_string(),
210                prefix = internal_id_prefix
211            );
212            span.follows_from(Span::current());
213
214            room_event_cache_updates_task(
215                room_event_cache.clone(),
216                controller.clone(),
217                event_subscriber,
218                is_live,
219            )
220            .instrument(span)
221        });
222
223        let local_echo_listener_handle = {
224            let timeline_controller = controller.clone();
225            let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
226
227            spawn({
228                // Handles existing local echoes first.
229                for echo in local_echoes {
230                    timeline_controller.handle_local_echo(echo).await;
231                }
232
233                let span = info_span!(
234                    parent: Span::none(),
235                    "local_echo_handler",
236                    room_id = ?room.room_id(),
237                    focus = focus.debug_string(),
238                    prefix = internal_id_prefix
239                );
240                span.follows_from(Span::current());
241
242                room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
243            })
244        };
245
246        let room_key_handle = client.add_event_handler(handle_room_key_event(
247            controller.clone(),
248            room.room_id().to_owned(),
249        ));
250
251        let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event(
252            controller.clone(),
253            room.room_id().to_owned(),
254        ));
255
256        let event_handlers = vec![room_key_handle, forwarded_room_key_handle];
257
258        // Not using room.add_event_handler here because RoomKey events are
259        // to-device events that are not received in the context of a room.
260
261        let room_key_from_backups_join_handle = spawn(room_keys_from_backups_task(
262            client.encryption().backups().room_keys_for_room_stream(controller.room().room_id()),
263            controller.clone(),
264        ));
265
266        let room_key_backup_enabled_join_handle = spawn(backup_states_task(
267            client.encryption().backups().state_stream(),
268            controller.clone(),
269        ));
270
271        // TODO: Technically, this should be the only stream we need to listen to get
272        // notified when we should retry to decrypt an event. We sadly can't do that,
273        // since the cross-process support kills the `OlmMachine` which then in
274        // turn kills this stream. Once this is solved remove all the other ways we
275        // listen for room keys.
276        let room_keys_received_join_handle = {
277            spawn(room_key_received_task(
278                client.encryption().room_keys_received_stream().await.expect(
279                    "We should be logged in by now, so we should have access to an `OlmMachine` \
280                     to be able to listen to this stream",
281                ),
282                controller.clone(),
283            ))
284        };
285
286        let timeline = Timeline {
287            controller,
288            event_cache: room_event_cache,
289            drop_handle: Arc::new(TimelineDropHandle {
290                client,
291                event_handler_handles: event_handlers,
292                room_update_join_handle,
293                pinned_events_join_handle,
294                room_key_from_backups_join_handle,
295                room_key_backup_enabled_join_handle,
296                room_keys_received_join_handle,
297                local_echo_listener_handle,
298                _event_cache_drop_handle: event_cache_drop,
299                encryption_changes_handle,
300            }),
301        };
302
303        if has_events {
304            // The events we're injecting might be encrypted events, but we might
305            // have received the room key to decrypt them while nobody was listening to the
306            // `m.room_key` event, let's retry now.
307            timeline.retry_decryption_for_all_events().await;
308        }
309
310        Ok(timeline)
311    }
312}
313
314/// The task that handles the pinned event IDs updates.
315async fn pinned_events_task<S>(pinned_event_ids_stream: S, timeline_controller: TimelineController)
316where
317    S: Stream<Item = Vec<OwnedEventId>>,
318{
319    pin_mut!(pinned_event_ids_stream);
320
321    while pinned_event_ids_stream.next().await.is_some() {
322        if let Ok(events) = timeline_controller.reload_pinned_events().await {
323            timeline_controller
324                .replace_with_initial_remote_events(
325                    events.into_iter(),
326                    RemoteEventOrigin::Pagination,
327                )
328                .await;
329        }
330    }
331}
332
333/// The task that handles the [`RoomEventCacheUpdate`]s.
334async fn room_event_cache_updates_task(
335    room_event_cache: RoomEventCache,
336    timeline_controller: TimelineController,
337    mut event_subscriber: RoomEventCacheListener,
338    is_live: bool,
339) {
340    trace!("Spawned the event subscriber task.");
341
342    loop {
343        trace!("Waiting for an event.");
344
345        let update = match event_subscriber.recv().await {
346            Ok(up) => up,
347            Err(RecvError::Closed) => break,
348            Err(RecvError::Lagged(num_skipped)) => {
349                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
350
351                // The updates might have lagged, but the room event cache might have
352                // events, so retrieve them and add them back again to the timeline,
353                // after clearing it.
354                let (initial_events, _stream) = room_event_cache.subscribe().await;
355
356                timeline_controller
357                    .replace_with_initial_remote_events(
358                        initial_events.into_iter(),
359                        RemoteEventOrigin::Cache,
360                    )
361                    .await;
362
363                continue;
364            }
365        };
366
367        match update {
368            RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
369                trace!(target = %event_id, "Handling fully read marker.");
370                timeline_controller.handle_fully_read_marker(event_id).await;
371            }
372
373            RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
374                trace!("Received new timeline events diffs");
375
376                // We shouldn't use the general way of adding events to timelines to
377                // non-live timelines, such as pinned events or focused timeline.
378                // These timelines should handle any live updates by themselves.
379                if !is_live {
380                    continue;
381                }
382
383                timeline_controller
384                    .handle_remote_events_with_diffs(
385                        diffs,
386                        match origin {
387                            EventsOrigin::Sync => RemoteEventOrigin::Sync,
388                            EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
389                            EventsOrigin::Cache => RemoteEventOrigin::Cache,
390                        },
391                    )
392                    .await;
393
394                if matches!(origin, EventsOrigin::Cache) {
395                    timeline_controller.retry_event_decryption(None).await;
396                }
397            }
398
399            RoomEventCacheUpdate::AddEphemeralEvents { events } => {
400                trace!("Received new ephemeral events from sync.");
401
402                // TODO: (bnjbvr) ephemeral should be handled by the event cache.
403                timeline_controller.handle_ephemeral_events(events).await;
404            }
405
406            RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
407                if !ambiguity_changes.is_empty() {
408                    let member_ambiguity_changes = ambiguity_changes
409                        .values()
410                        .flat_map(|change| change.user_ids())
411                        .collect::<BTreeSet<_>>();
412                    timeline_controller
413                        .force_update_sender_profiles(&member_ambiguity_changes)
414                        .await;
415                }
416            }
417        }
418    }
419}
420
421/// The task that handles the [`RoomSendQueueUpdate`]s.
422async fn room_send_queue_update_task(
423    mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
424    timeline_controller: TimelineController,
425) {
426    trace!("spawned the local echo task!");
427
428    loop {
429        match send_queue_stream.recv().await {
430            Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
431
432            Err(RecvError::Lagged(num_missed)) => {
433                warn!("missed {num_missed} local echoes, ignoring those missed");
434            }
435
436            Err(RecvError::Closed) => {
437                trace!("channel closed, exiting the local echo handler");
438                break;
439            }
440        }
441    }
442}
443
444/// The task that handles the room keys from backups.
445async fn room_keys_from_backups_task<S>(stream: S, timeline_controller: TimelineController)
446where
447    S: Stream<Item = Result<BTreeMap<String, BTreeSet<String>>, BroadcastStreamRecvError>>,
448{
449    pin_mut!(stream);
450
451    while let Some(update) = stream.next().await {
452        match update {
453            Ok(info) => {
454                let mut session_ids = BTreeSet::new();
455
456                for set in info.into_values() {
457                    session_ids.extend(set);
458                }
459
460                timeline_controller.retry_event_decryption(Some(session_ids)).await;
461            }
462            // We lagged, so retry every event.
463            Err(_) => timeline_controller.retry_event_decryption(None).await,
464        }
465    }
466}
467
468/// The task that handles the [`BackupState`] updates.
469async fn backup_states_task<S>(backup_states_stream: S, timeline_controller: TimelineController)
470where
471    S: Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
472{
473    pin_mut!(backup_states_stream);
474
475    while let Some(update) = backup_states_stream.next().await {
476        match update {
477            // If the backup got enabled, or we lagged and thus missed that the backup
478            // might be enabled, retry to decrypt all the events. Please note, depending
479            // on the backup download strategy, this might do two things under the
480            // assumption that the backup contains the relevant room keys:
481            //
482            // 1. It will decrypt the events, if `BackupDownloadStrategy` has been set to `OneShot`.
483            // 2. It will fail to decrypt the event, but try to download the room key to decrypt it
484            //    if the `BackupDownloadStrategy` has been set to `AfterDecryptionFailure`.
485            Ok(BackupState::Enabled) | Err(_) => {
486                timeline_controller.retry_event_decryption(None).await;
487            }
488            // The other states aren't interesting since they are either still enabling
489            // the backup or have the backup in the disabled state.
490            Ok(
491                BackupState::Unknown
492                | BackupState::Creating
493                | BackupState::Resuming
494                | BackupState::Disabling
495                | BackupState::Downloading
496                | BackupState::Enabling,
497            ) => (),
498        }
499    }
500}
501
502/// The task that handles the [`RoomKeyInfo`] updates.
503async fn room_key_received_task<S>(
504    room_keys_received_stream: S,
505    timeline_controller: TimelineController,
506) where
507    S: Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>,
508{
509    pin_mut!(room_keys_received_stream);
510
511    let room_id = timeline_controller.room().room_id();
512
513    while let Some(room_keys) = room_keys_received_stream.next().await {
514        let session_ids = match room_keys {
515            Ok(room_keys) => {
516                let session_ids: BTreeSet<String> = room_keys
517                    .into_iter()
518                    .filter(|info| info.room_id == room_id)
519                    .map(|info| info.session_id)
520                    .collect();
521
522                Some(session_ids)
523            }
524            Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
525                // We lagged, let's retry to decrypt anything we have, maybe something
526                // was received.
527                warn!(
528                    missed_updates,
529                    "The room keys stream has lagged, retrying to decrypt the whole timeline"
530                );
531
532                None
533            }
534        };
535
536        timeline_controller.retry_event_decryption(session_ids).await;
537    }
538}