matrix_sdk_ui/timeline/builder.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use matrix_sdk::{Room, executor::spawn};
18use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm};
19use ruma::{events::AnySyncTimelineEvent, room_version_rules::RoomVersionRules};
20use tracing::{Instrument, Span, info_span};
21
22use super::{
23 DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
24 controller::{TimelineController, TimelineSettings},
25};
26use crate::{
27 timeline::{
28 TimelineReadReceiptTracking,
29 controller::spawn_crypto_tasks,
30 tasks::{
31 pinned_events_task, room_event_cache_updates_task, room_send_queue_update_task,
32 thread_updates_task,
33 },
34 },
35 unable_to_decrypt_hook::UtdHookManager,
36};
37
38/// Builder that allows creating and configuring various parts of a
39/// [`Timeline`].
40#[must_use]
41#[derive(Debug)]
42pub struct TimelineBuilder {
43 room: Room,
44 settings: TimelineSettings,
45 focus: TimelineFocus,
46
47 /// An optional hook to call whenever we run into an unable-to-decrypt or a
48 /// late-decryption event.
49 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
50
51 /// An optional prefix for internal IDs.
52 internal_id_prefix: Option<String>,
53}
54
55impl TimelineBuilder {
56 pub fn new(room: &Room) -> Self {
57 Self {
58 room: room.clone(),
59 settings: TimelineSettings::default(),
60 unable_to_decrypt_hook: None,
61 focus: TimelineFocus::Live { hide_threaded_events: false },
62 internal_id_prefix: None,
63 }
64 }
65
66 /// Sets up the initial focus for this timeline.
67 ///
68 /// By default, the focus for a timeline is to be "live" (i.e. it will
69 /// listen to sync and append this room's events in real-time, and it'll be
70 /// able to back-paginate older events), and show all events (including
71 /// events in threads). Look at [`TimelineFocus`] for other options.
72 pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
73 self.focus = focus;
74 self
75 }
76
77 /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
78 /// we're building.
79 ///
80 /// If it was previously set before, will overwrite the previous one.
81 pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
82 self.unable_to_decrypt_hook = Some(hook);
83 self
84 }
85
86 /// Sets the internal id prefix for this timeline.
87 ///
88 /// The prefix will be prepended to any internal ID using when generating
89 /// timeline IDs for this timeline.
90 pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
91 self.internal_id_prefix = Some(prefix);
92 self
93 }
94
95 /// Choose when to insert the date separators, either in between each day
96 /// or each month.
97 pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
98 self.settings.date_divider_mode = mode;
99 self
100 }
101
102 /// Choose whether to enable tracking of the fully-read marker and the read
103 /// receipts and on which event types.
104 pub fn track_read_marker_and_receipts(mut self, tracking: TimelineReadReceiptTracking) -> Self {
105 self.settings.track_read_receipts = tracking;
106 self
107 }
108
109 /// Use the given filter to choose whether to add events to the timeline.
110 ///
111 /// # Arguments
112 ///
113 /// * `filter` - A function that takes a deserialized event, and should
114 /// return `true` if the event should be added to the `Timeline`.
115 ///
116 /// If this is not overridden, the timeline uses the default filter that
117 /// only allows events that are materialized into a `Timeline` item. For
118 /// instance, reactions and edits don't get their own timeline item (as
119 /// they affect another existing one), so they're "filtered out" to
120 /// reflect that.
121 ///
122 /// You can use the default event filter with
123 /// [`crate::timeline::default_event_filter`] so as to chain it with
124 /// your own event filter, if you want to avoid situations where a read
125 /// receipt would be attached to an event that doesn't get its own
126 /// timeline item.
127 ///
128 /// Note that currently:
129 ///
130 /// - Not all event types have a representation as a `TimelineItem` so these
131 /// are not added no matter what the filter returns.
132 /// - It is not possible to filter out `m.room.encrypted` events (otherwise
133 /// they couldn't be decrypted when the appropriate room key arrives).
134 pub fn event_filter<F>(mut self, filter: F) -> Self
135 where
136 F: Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool
137 + SendOutsideWasm
138 + SyncOutsideWasm
139 + 'static,
140 {
141 self.settings.event_filter = Arc::new(filter);
142 self
143 }
144
145 /// Whether to add events that failed to deserialize to the timeline.
146 ///
147 /// Defaults to `true`.
148 pub fn add_failed_to_parse(mut self, add: bool) -> Self {
149 self.settings.add_failed_to_parse = add;
150 self
151 }
152
153 /// Create a [`Timeline`] with the options set on this builder.
154 #[tracing::instrument(
155 skip(self),
156 fields(
157 room_id = ?self.room.room_id(),
158 track_read_receipts = ?self.settings.track_read_receipts,
159 )
160 )]
161 pub async fn build(self) -> Result<Timeline, Error> {
162 let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
163
164 // Subscribe the event cache to sync responses, in case we hadn't done it yet.
165 room.client().event_cache().subscribe()?;
166
167 let (room_event_cache, event_cache_drop) = room.event_cache().await?;
168 let (_, event_subscriber) = room_event_cache.subscribe().await?;
169
170 let is_room_encrypted = room
171 .latest_encryption_state()
172 .await
173 .map(|state| state.is_encrypted())
174 .ok()
175 .unwrap_or_default();
176
177 let controller = TimelineController::new(
178 room.clone(),
179 focus.clone(),
180 internal_id_prefix.clone(),
181 unable_to_decrypt_hook,
182 is_room_encrypted,
183 settings,
184 );
185
186 let has_events = controller.init_focus(&focus, &room_event_cache).await?;
187
188 let pinned_events_join_handle = if matches!(focus, TimelineFocus::PinnedEvents { .. }) {
189 Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone())))
190 } else {
191 None
192 };
193
194 let room_update_join_handle = spawn({
195 let span = info_span!(
196 parent: Span::none(),
197 "live_update_handler",
198 room_id = ?room.room_id(),
199 focus = focus.debug_string(),
200 prefix = internal_id_prefix
201 );
202 span.follows_from(Span::current());
203
204 room_event_cache_updates_task(
205 room_event_cache.clone(),
206 controller.clone(),
207 event_subscriber,
208 focus.clone(),
209 )
210 .instrument(span)
211 });
212
213 let thread_update_join_handle =
214 if let TimelineFocus::Thread { root_event_id: root } = &focus {
215 Some({
216 let span = info_span!(
217 parent: Span::none(),
218 "thread_live_update_handler",
219 room_id = ?room.room_id(),
220 focus = focus.debug_string(),
221 prefix = internal_id_prefix
222 );
223 span.follows_from(Span::current());
224
225 // Note: must be done here *before* spawning the task, to avoid race conditions
226 // with event cache updates happening in the background.
227 let (_events, receiver) =
228 room_event_cache.subscribe_to_thread(root.clone()).await?;
229
230 spawn(
231 thread_updates_task(
232 receiver,
233 room_event_cache.clone(),
234 controller.clone(),
235 root.clone(),
236 )
237 .instrument(span),
238 )
239 })
240 } else {
241 None
242 };
243
244 let local_echo_listener_handle = {
245 let timeline_controller = controller.clone();
246 let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
247
248 spawn({
249 // Handles existing local echoes first.
250 for echo in local_echoes {
251 timeline_controller.handle_local_echo(echo).await;
252 }
253
254 let span = info_span!(
255 parent: Span::none(),
256 "local_echo_handler",
257 room_id = ?room.room_id(),
258 focus = focus.debug_string(),
259 prefix = internal_id_prefix
260 );
261 span.follows_from(Span::current());
262
263 room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
264 })
265 };
266
267 let crypto_drop_handles = spawn_crypto_tasks(room, controller.clone()).await;
268
269 let timeline = Timeline {
270 controller,
271 event_cache: room_event_cache,
272 drop_handle: Arc::new(TimelineDropHandle {
273 _crypto_drop_handles: crypto_drop_handles,
274 room_update_join_handle,
275 thread_update_join_handle,
276 pinned_events_join_handle,
277 local_echo_listener_handle,
278 _event_cache_drop_handle: event_cache_drop,
279 }),
280 };
281
282 if has_events {
283 // The events we're injecting might be encrypted events, but we might
284 // have received the room key to decrypt them while nobody was listening to the
285 // `m.room_key` event, let's retry now.
286 timeline.retry_decryption_for_all_events().await;
287 }
288
289 Ok(timeline)
290 }
291}