Skip to main content

matrix_sdk_ui/timeline/
builder.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use matrix_sdk::Room;
18use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm};
19use ruma::{events::AnySyncTimelineEvent, room_version_rules::RoomVersionRules};
20use tracing::{Instrument, Span, info_span};
21
22use super::{
23    DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
24    controller::{TimelineController, TimelineSettings},
25};
26use crate::{
27    timeline::{
28        PaginationError, TimelineReadReceiptTracking,
29        controller::spawn_crypto_tasks,
30        tasks::{
31            event_focused_task, pinned_events_task, room_event_cache_updates_task,
32            room_send_queue_update_task, thread_updates_task,
33        },
34    },
35    unable_to_decrypt_hook::UtdHookManager,
36};
37
38/// Builder that allows creating and configuring various parts of a
39/// [`Timeline`].
40#[must_use]
41#[derive(Debug)]
42pub struct TimelineBuilder {
43    room: Room,
44    settings: TimelineSettings,
45    focus: TimelineFocus,
46
47    /// An optional hook to call whenever we run into an unable-to-decrypt or a
48    /// late-decryption event.
49    unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
50
51    /// An optional prefix for internal IDs.
52    internal_id_prefix: Option<String>,
53}
54
55impl TimelineBuilder {
56    pub fn new(room: &Room) -> Self {
57        Self {
58            room: room.clone(),
59            settings: TimelineSettings::default(),
60            unable_to_decrypt_hook: None,
61            focus: TimelineFocus::Live { hide_threaded_events: false },
62            internal_id_prefix: None,
63        }
64    }
65
66    /// Sets up the initial focus for this timeline.
67    ///
68    /// By default, the focus for a timeline is to be "live" (i.e. it will
69    /// listen to sync and append this room's events in real-time, and it'll be
70    /// able to back-paginate older events), and show all events (including
71    /// events in threads). Look at [`TimelineFocus`] for other options.
72    pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
73        self.focus = focus;
74        self
75    }
76
77    /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
78    /// we're building.
79    ///
80    /// If it was previously set before, will overwrite the previous one.
81    pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
82        self.unable_to_decrypt_hook = Some(hook);
83        self
84    }
85
86    /// Sets the internal id prefix for this timeline.
87    ///
88    /// The prefix will be prepended to any internal ID using when generating
89    /// timeline IDs for this timeline.
90    pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
91        self.internal_id_prefix = Some(prefix);
92        self
93    }
94
95    /// Choose when to insert the date separators, either in between each day
96    /// or each month.
97    pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
98        self.settings.date_divider_mode = mode;
99        self
100    }
101
102    /// Choose whether to enable tracking of the fully-read marker and the read
103    /// receipts and on which event types.
104    pub fn track_read_marker_and_receipts(mut self, tracking: TimelineReadReceiptTracking) -> Self {
105        self.settings.track_read_receipts = tracking;
106        self
107    }
108
109    /// Use the given filter to choose whether to add events to the timeline.
110    ///
111    /// # Arguments
112    ///
113    /// * `filter` - A function that takes a deserialized event, and should
114    ///   return `true` if the event should be added to the `Timeline`.
115    ///
116    /// If this is not overridden, the timeline uses the default filter that
117    /// only allows events that are materialized into a `Timeline` item. For
118    /// instance, reactions and edits don't get their own timeline item (as
119    /// they affect another existing one), so they're "filtered out" to
120    /// reflect that.
121    ///
122    /// You can use the default event filter with
123    /// [`crate::timeline::default_event_filter`] so as to chain it with
124    /// your own event filter, if you want to avoid situations where a read
125    /// receipt would be attached to an event that doesn't get its own
126    /// timeline item.
127    ///
128    /// Note that currently:
129    ///
130    /// - Not all event types have a representation as a `TimelineItem` so these
131    ///   are not added no matter what the filter returns.
132    /// - It is not possible to filter out `m.room.encrypted` events (otherwise
133    ///   they couldn't be decrypted when the appropriate room key arrives).
134    pub fn event_filter<F>(mut self, filter: F) -> Self
135    where
136        F: Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool
137            + SendOutsideWasm
138            + SyncOutsideWasm
139            + 'static,
140    {
141        self.settings.event_filter = Arc::new(filter);
142        self
143    }
144
145    /// Whether to add events that failed to deserialize to the timeline.
146    ///
147    /// Defaults to `true`.
148    pub fn add_failed_to_parse(mut self, add: bool) -> Self {
149        self.settings.add_failed_to_parse = add;
150        self
151    }
152
153    /// Create a [`Timeline`] with the options set on this builder.
154    #[tracing::instrument(
155        skip(self),
156        fields(
157            room_id = ?self.room.room_id(),
158            track_read_receipts = ?self.settings.track_read_receipts,
159        )
160    )]
161    pub async fn build(self) -> Result<Timeline, Error> {
162        let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
163
164        // Subscribe the event cache to sync responses, in case we hadn't done it yet.
165        room.client().event_cache().subscribe()?;
166
167        let (room_event_cache, event_cache_drop) = room.event_cache().await?;
168        let (_, event_subscriber) = room_event_cache.subscribe().await?;
169
170        let is_room_encrypted = room
171            .latest_encryption_state()
172            .await
173            .map(|state| state.is_encrypted())
174            .ok()
175            .unwrap_or_default();
176
177        let controller = TimelineController::new(
178            room.clone(),
179            focus.clone(),
180            internal_id_prefix.clone(),
181            unable_to_decrypt_hook,
182            is_room_encrypted,
183            settings,
184        );
185
186        let has_events = controller.init_focus(&focus, &room_event_cache).await?;
187
188        let pinned_events_join_handle = if matches!(focus, TimelineFocus::PinnedEvents) {
189            let (_initial_events, pinned_events_recv) =
190                room_event_cache.subscribe_to_pinned_events().await?;
191
192            Some(
193                room.client()
194                    .task_monitor()
195                    .spawn_background_task(
196                        "timeline::pinned_event_cache_updates",
197                        pinned_events_task(
198                            room_event_cache.clone(),
199                            controller.clone(),
200                            pinned_events_recv,
201                        ),
202                    )
203                    .abort_on_drop(),
204            )
205        } else {
206            None
207        };
208
209        let event_focused_join_handle =
210            if let TimelineFocus::Event { target, thread_mode, .. } = &focus {
211                let cache = room_event_cache
212                    .get_event_focused_cache(target.clone(), (*thread_mode).into())
213                    .await?
214                    .ok_or(Error::PaginationError(PaginationError::MissingCache))?;
215
216                let (_initial_events, recv) = cache.subscribe().await;
217
218                Some(room.client().task_monitor().spawn_background_task(
219                    "timeline::event_focused_cache_updates",
220                    event_focused_task(
221                        target.clone(),
222                        (*thread_mode).into(),
223                        room_event_cache.clone(),
224                        controller.clone(),
225                        recv,
226                    ),
227                ))
228            } else {
229                None
230            };
231
232        let room_update_join_handle = room
233            .client()
234            .task_monitor()
235            .spawn_background_task("timeline::room_event_cache_updates", {
236                let span = info_span!(
237                    parent: Span::none(),
238                    "live_update_handler",
239                    room_id = ?room.room_id(),
240                    focus = focus.debug_string(),
241                    prefix = internal_id_prefix
242                );
243                span.follows_from(Span::current());
244
245                room_event_cache_updates_task(
246                    room_event_cache.clone(),
247                    controller.clone(),
248                    event_subscriber,
249                    focus.clone(),
250                )
251                .instrument(span)
252            })
253            .abort_on_drop();
254
255        let thread_update_join_handle =
256            if let TimelineFocus::Thread { root_event_id: root } = &focus {
257                Some({
258                    let span = info_span!(
259                        parent: Span::none(),
260                        "thread_live_update_handler",
261                        room_id = ?room.room_id(),
262                        focus = focus.debug_string(),
263                        prefix = internal_id_prefix
264                    );
265                    span.follows_from(Span::current());
266
267                    // Note: must be done here *before* spawning the task, to avoid race conditions
268                    // with event cache updates happening in the background.
269                    let (_events, receiver) =
270                        room_event_cache.subscribe_to_thread(root.clone()).await?;
271
272                    room.client()
273                        .task_monitor()
274                        .spawn_background_task(
275                            "timeline::thread_event_cache_updates",
276                            thread_updates_task(
277                                receiver,
278                                room_event_cache.clone(),
279                                controller.clone(),
280                                root.clone(),
281                            )
282                            .instrument(span),
283                        )
284                        .abort_on_drop()
285                })
286            } else {
287                None
288            };
289
290        let local_echo_listener_handle = {
291            let timeline_controller = controller.clone();
292            let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
293
294            room.client().task_monitor().spawn_background_task("timeline::local_echo_listener", {
295                // Handles existing local echoes first.
296                for echo in local_echoes {
297                    timeline_controller.handle_local_echo(echo).await;
298                }
299
300                let span = info_span!(
301                    parent: Span::none(),
302                    "local_echo_handler",
303                    room_id = ?room.room_id(),
304                    focus = focus.debug_string(),
305                    prefix = internal_id_prefix
306                );
307                span.follows_from(Span::current());
308
309                room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
310            })
311        };
312
313        let crypto_drop_handles = spawn_crypto_tasks(controller.clone()).await;
314
315        let timeline = Timeline {
316            controller,
317            event_cache: room_event_cache,
318            drop_handle: Arc::new(TimelineDropHandle {
319                _crypto_drop_handles: crypto_drop_handles,
320                _room_update_join_handle: room_update_join_handle,
321                _thread_update_join_handle: thread_update_join_handle,
322                _pinned_events_join_handle: pinned_events_join_handle,
323                _local_echo_listener_handle: local_echo_listener_handle,
324                _event_focused_join_handle: event_focused_join_handle,
325                _event_cache_drop_handle: event_cache_drop,
326            }),
327        };
328
329        if has_events {
330            // The events we're injecting might be encrypted events, but we might
331            // have received the room key to decrypt them while nobody was listening to the
332            // `m.room_key` event, let's retry now.
333            timeline.retry_decryption_for_all_events().await;
334        }
335
336        Ok(timeline)
337    }
338}