matrix_sdk_ui/timeline/
builder.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, sync::Arc};
16
17use futures_util::{pin_mut, StreamExt};
18use matrix_sdk::{
19    encryption::backups::BackupState,
20    event_cache::{EventsOrigin, RoomEventCacheUpdate},
21    executor::spawn,
22    Room,
23};
24use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
25use tokio::sync::broadcast::error::RecvError;
26use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
27use tracing::{info, info_span, trace, warn, Instrument, Span};
28
29use super::{
30    controller::{TimelineController, TimelineSettings},
31    to_device::{handle_forwarded_room_key_event, handle_room_key_event},
32    DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
33};
34use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
35
36/// Builder that allows creating and configuring various parts of a
37/// [`Timeline`].
38#[must_use]
39#[derive(Debug)]
40pub struct TimelineBuilder {
41    room: Room,
42    settings: TimelineSettings,
43    focus: TimelineFocus,
44
45    /// An optional hook to call whenever we run into an unable-to-decrypt or a
46    /// late-decryption event.
47    unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
48
49    /// An optional prefix for internal IDs.
50    internal_id_prefix: Option<String>,
51}
52
53impl TimelineBuilder {
54    pub(super) fn new(room: &Room) -> Self {
55        Self {
56            room: room.clone(),
57            settings: TimelineSettings::default(),
58            unable_to_decrypt_hook: None,
59            focus: TimelineFocus::Live,
60            internal_id_prefix: None,
61        }
62    }
63
64    /// Sets up the initial focus for this timeline.
65    ///
66    /// This can be changed later on while the timeline is alive.
67    pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
68        self.focus = focus;
69        self
70    }
71
72    /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
73    /// we're building.
74    ///
75    /// If it was previously set before, will overwrite the previous one.
76    pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
77        self.unable_to_decrypt_hook = Some(hook);
78        self
79    }
80
81    /// Sets the internal id prefix for this timeline.
82    ///
83    /// The prefix will be prepended to any internal ID using when generating
84    /// timeline IDs for this timeline.
85    pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
86        self.internal_id_prefix = Some(prefix);
87        self
88    }
89
90    /// Chose when to insert the date separators, either in between each day
91    /// or each month.
92    pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
93        self.settings.date_divider_mode = mode;
94        self
95    }
96
97    /// Enable tracking of the fully-read marker and the read receipts on the
98    /// timeline.
99    pub fn track_read_marker_and_receipts(mut self) -> Self {
100        self.settings.track_read_receipts = true;
101        self
102    }
103
104    /// Use the given filter to choose whether to add events to the timeline.
105    ///
106    /// # Arguments
107    ///
108    /// * `filter` - A function that takes a deserialized event, and should
109    ///   return `true` if the event should be added to the `Timeline`.
110    ///
111    /// If this is not overridden, the timeline uses the default filter that
112    /// only allows events that are materialized into a `Timeline` item. For
113    /// instance, reactions and edits don't get their own timeline item (as
114    /// they affect another existing one), so they're "filtered out" to
115    /// reflect that.
116    ///
117    /// You can use the default event filter with
118    /// [`crate::timeline::default_event_filter`] so as to chain it with
119    /// your own event filter, if you want to avoid situations where a read
120    /// receipt would be attached to an event that doesn't get its own
121    /// timeline item.
122    ///
123    /// Note that currently:
124    ///
125    /// - Not all event types have a representation as a `TimelineItem` so these
126    ///   are not added no matter what the filter returns.
127    /// - It is not possible to filter out `m.room.encrypted` events (otherwise
128    ///   they couldn't be decrypted when the appropriate room key arrives).
129    pub fn event_filter<F>(mut self, filter: F) -> Self
130    where
131        F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync + 'static,
132    {
133        self.settings.event_filter = Arc::new(filter);
134        self
135    }
136
137    /// Whether to add events that failed to deserialize to the timeline.
138    ///
139    /// Defaults to `true`.
140    pub fn add_failed_to_parse(mut self, add: bool) -> Self {
141        self.settings.add_failed_to_parse = add;
142        self
143    }
144
145    /// Create a [`Timeline`] with the options set on this builder.
146    #[tracing::instrument(
147        skip(self),
148        fields(
149            room_id = ?self.room.room_id(),
150            track_read_receipts = self.settings.track_read_receipts,
151        )
152    )]
153    pub async fn build(self) -> Result<Timeline, Error> {
154        let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
155
156        let client = room.client();
157        let event_cache = client.event_cache();
158
159        // Subscribe the event cache to sync responses, in case we hadn't done it yet.
160        event_cache.subscribe()?;
161
162        let (room_event_cache, event_cache_drop) = room.event_cache().await?;
163        let (_, mut event_subscriber) = room_event_cache.subscribe().await;
164
165        let is_live = matches!(focus, TimelineFocus::Live);
166        let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. });
167        let is_room_encrypted = room.is_encrypted().await.ok().unwrap_or_default();
168
169        let controller = TimelineController::new(
170            room,
171            focus.clone(),
172            internal_id_prefix.clone(),
173            unable_to_decrypt_hook,
174            is_room_encrypted,
175        )
176        .with_settings(settings);
177
178        let has_events = controller.init_focus(&room_event_cache).await?;
179
180        let room = controller.room();
181        let client = room.client();
182
183        let pinned_events_join_handle = if is_pinned_events {
184            let mut pinned_event_ids_stream = room.pinned_event_ids_stream();
185            Some(spawn({
186                let inner = controller.clone();
187                async move {
188                    while pinned_event_ids_stream.next().await.is_some() {
189                        if let Ok(events) = inner.reload_pinned_events().await {
190                            inner
191                                .replace_with_initial_remote_events(
192                                    events.into_iter(),
193                                    RemoteEventOrigin::Pagination,
194                                )
195                                .await;
196                        }
197                    }
198                }
199            }))
200        } else {
201            None
202        };
203
204        let encryption_changes_handle = spawn({
205            let inner = controller.clone();
206            async move {
207                inner.handle_encryption_state_changes().await;
208            }
209        });
210
211        let room_update_join_handle = spawn({
212            let room_event_cache = room_event_cache.clone();
213            let inner = controller.clone();
214
215            let span = info_span!(
216                parent: Span::none(),
217                "live_update_handler",
218                room_id = ?room.room_id(),
219                focus = focus.debug_string(),
220                prefix = internal_id_prefix
221            );
222            span.follows_from(Span::current());
223
224            async move {
225                trace!("Spawned the event subscriber task.");
226
227                loop {
228                    trace!("Waiting for an event.");
229
230                    let update = match event_subscriber.recv().await {
231                        Ok(up) => up,
232                        Err(RecvError::Closed) => break,
233                        Err(RecvError::Lagged(num_skipped)) => {
234                            warn!(
235                                num_skipped,
236                                "Lagged behind event cache updates, resetting timeline"
237                            );
238
239                            // The updates might have lagged, but the room event cache might have
240                            // events, so retrieve them and add them back again to the timeline,
241                            // after clearing it.
242                            //
243                            // If we can't get a handle on the room cache's events, just clear the
244                            // current timeline.
245                            let (initial_events, _stream) = room_event_cache.subscribe().await;
246
247                            inner
248                                .replace_with_initial_remote_events(
249                                    initial_events.into_iter(),
250                                    RemoteEventOrigin::Sync,
251                                )
252                                .await;
253
254                            continue;
255                        }
256                    };
257
258                    match update {
259                        RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
260                            trace!(target = %event_id, "Handling fully read marker.");
261                            inner.handle_fully_read_marker(event_id).await;
262                        }
263
264                        RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
265                            trace!("Received new timeline events diffs");
266
267                            // We shouldn't use the general way of adding events to timelines to
268                            // non-live timelines, such as pinned events or focused timeline.
269                            // These timelines should handle any live updates by themselves.
270                            if !is_live {
271                                continue;
272                            }
273
274                            inner
275                                .handle_remote_events_with_diffs(
276                                    diffs,
277                                    match origin {
278                                        EventsOrigin::Sync => RemoteEventOrigin::Sync,
279                                        EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
280                                    },
281                                )
282                                .await;
283                        }
284
285                        RoomEventCacheUpdate::AddEphemeralEvents { events } => {
286                            trace!("Received new ephemeral events from sync.");
287
288                            // TODO: (bnjbvr) ephemeral should be handled by the event cache.
289                            inner.handle_ephemeral_events(events).await;
290                        }
291
292                        RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
293                            if !ambiguity_changes.is_empty() {
294                                let member_ambiguity_changes = ambiguity_changes
295                                    .values()
296                                    .flat_map(|change| change.user_ids())
297                                    .collect::<BTreeSet<_>>();
298                                inner.force_update_sender_profiles(&member_ambiguity_changes).await;
299                            }
300                        }
301                    }
302                }
303            }
304            .instrument(span)
305        });
306
307        let local_echo_listener_handle = {
308            let timeline = controller.clone();
309            let (local_echoes, mut listener) = room.send_queue().subscribe().await?;
310
311            spawn({
312                // Handles existing local echoes first.
313                for echo in local_echoes {
314                    timeline.handle_local_echo(echo).await;
315                }
316
317                let span = info_span!(
318                    parent: Span::none(),
319                    "local_echo_handler",
320                    room_id = ?room.room_id(),
321                    focus = focus.debug_string(),
322                    prefix = internal_id_prefix
323                );
324                span.follows_from(Span::current());
325
326                // React to future local echoes too.
327                async move {
328                    info!("spawned the local echo handler!");
329
330                    loop {
331                        match listener.recv().await {
332                            Ok(update) => timeline.handle_room_send_queue_update(update).await,
333
334                            Err(RecvError::Lagged(num_missed)) => {
335                                warn!("missed {num_missed} local echoes, ignoring those missed");
336                            }
337
338                            Err(RecvError::Closed) => {
339                                info!("channel closed, exiting the local echo handler");
340                                break;
341                            }
342                        }
343                    }
344                }
345                .instrument(span)
346            })
347        };
348
349        // Not using room.add_event_handler here because RoomKey events are
350        // to-device events that are not received in the context of a room.
351
352        let room_key_handle = client.add_event_handler(handle_room_key_event(
353            controller.clone(),
354            room.room_id().to_owned(),
355        ));
356
357        let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event(
358            controller.clone(),
359            room.room_id().to_owned(),
360        ));
361
362        let handles = vec![room_key_handle, forwarded_room_key_handle];
363
364        let room_key_from_backups_join_handle = {
365            let inner = controller.clone();
366            let room_id = inner.room().room_id();
367
368            let stream = client.encryption().backups().room_keys_for_room_stream(room_id);
369
370            spawn(async move {
371                pin_mut!(stream);
372
373                while let Some(update) = stream.next().await {
374                    let room = inner.room();
375
376                    match update {
377                        Ok(info) => {
378                            let mut session_ids = BTreeSet::new();
379
380                            for set in info.into_values() {
381                                session_ids.extend(set);
382                            }
383
384                            inner.retry_event_decryption(room, Some(session_ids)).await;
385                        }
386                        // We lagged, so retry every event.
387                        Err(_) => inner.retry_event_decryption(room, None).await,
388                    }
389                }
390            })
391        };
392
393        let room_key_backup_enabled_join_handle = {
394            let inner = controller.clone();
395            let stream = client.encryption().backups().state_stream();
396
397            spawn(async move {
398                pin_mut!(stream);
399
400                while let Some(update) = stream.next().await {
401                    match update {
402                        // If the backup got enabled, or we lagged and thus missed that the backup
403                        // might be enabled, retry to decrypt all the events. Please note, depending
404                        // on the backup download strategy, this might do two things under the
405                        // assumption that the backup contains the relevant room keys:
406                        //
407                        // 1. It will decrypt the events, if `BackupDownloadStrategy` has been set
408                        //    to `OneShot`.
409                        // 2. It will fail to decrypt the event, but try to download the room key to
410                        //    decrypt it if the `BackupDownloadStrategy` has been set to
411                        //    `AfterDecryptionFailure`.
412                        Ok(BackupState::Enabled) | Err(_) => {
413                            let room = inner.room();
414                            inner.retry_event_decryption(room, None).await;
415                        }
416                        // The other states aren't interesting since they are either still enabling
417                        // the backup or have the backup in the disabled state.
418                        Ok(
419                            BackupState::Unknown
420                            | BackupState::Creating
421                            | BackupState::Resuming
422                            | BackupState::Disabling
423                            | BackupState::Downloading
424                            | BackupState::Enabling,
425                        ) => (),
426                    }
427                }
428            })
429        };
430
431        // TODO: Technically, this should be the only stream we need to listen to get
432        // notified when we should retry to decrypt an event. We sadly can't do that,
433        // since the cross-process support kills the `OlmMachine` which then in
434        // turn kills this stream. Once this is solved remove all the other ways we
435        // listen for room keys.
436        let room_keys_received_join_handle = {
437            let inner = controller.clone();
438            let stream = client.encryption().room_keys_received_stream().await.expect(
439                "We should be logged in by now, so we should have access to an OlmMachine \
440                 to be able to listen to this stream",
441            );
442
443            spawn(async move {
444                pin_mut!(stream);
445
446                while let Some(room_keys) = stream.next().await {
447                    let session_ids = match room_keys {
448                        Ok(room_keys) => {
449                            let session_ids: BTreeSet<String> = room_keys
450                                .into_iter()
451                                .filter(|info| info.room_id == inner.room().room_id())
452                                .map(|info| info.session_id)
453                                .collect();
454
455                            Some(session_ids)
456                        }
457                        Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
458                            // We lagged, let's retry to decrypt anything we have, maybe something
459                            // was received.
460                            warn!(missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline");
461
462                            None
463                        }
464                    };
465
466                    let room = inner.room();
467                    inner.retry_event_decryption(room, session_ids).await;
468                }
469            })
470        };
471
472        let timeline = Timeline {
473            controller,
474            event_cache: room_event_cache,
475            drop_handle: Arc::new(TimelineDropHandle {
476                client,
477                event_handler_handles: handles,
478                room_update_join_handle,
479                pinned_events_join_handle,
480                room_key_from_backups_join_handle,
481                room_key_backup_enabled_join_handle,
482                room_keys_received_join_handle,
483                local_echo_listener_handle,
484                _event_cache_drop_handle: event_cache_drop,
485                encryption_changes_handle,
486            }),
487        };
488
489        if has_events {
490            // The events we're injecting might be encrypted events, but we might
491            // have received the room key to decrypt them while nobody was listening to the
492            // `m.room_key` event, let's retry now.
493            timeline.retry_decryption_for_all_events().await;
494        }
495
496        Ok(timeline)
497    }
498}