matrix_sdk_ui/timeline/builder.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use matrix_sdk::{Room, executor::spawn};
18use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm};
19use ruma::{events::AnySyncTimelineEvent, room_version_rules::RoomVersionRules};
20use tracing::{Instrument, Span, info_span};
21
22use super::{
23 DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
24 controller::{TimelineController, TimelineSettings},
25};
26use crate::{
27 timeline::{
28 controller::spawn_crypto_tasks,
29 tasks::{
30 pinned_events_task, room_event_cache_updates_task, room_send_queue_update_task,
31 thread_updates_task,
32 },
33 },
34 unable_to_decrypt_hook::UtdHookManager,
35};
36
37/// Builder that allows creating and configuring various parts of a
38/// [`Timeline`].
39#[must_use]
40#[derive(Debug)]
41pub struct TimelineBuilder {
42 room: Room,
43 settings: TimelineSettings,
44 focus: TimelineFocus,
45
46 /// An optional hook to call whenever we run into an unable-to-decrypt or a
47 /// late-decryption event.
48 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
49
50 /// An optional prefix for internal IDs.
51 internal_id_prefix: Option<String>,
52}
53
54impl TimelineBuilder {
55 pub fn new(room: &Room) -> Self {
56 Self {
57 room: room.clone(),
58 settings: TimelineSettings::default(),
59 unable_to_decrypt_hook: None,
60 focus: TimelineFocus::Live { hide_threaded_events: false },
61 internal_id_prefix: None,
62 }
63 }
64
65 /// Sets up the initial focus for this timeline.
66 ///
67 /// By default, the focus for a timeline is to be "live" (i.e. it will
68 /// listen to sync and append this room's events in real-time, and it'll be
69 /// able to back-paginate older events), and show all events (including
70 /// events in threads). Look at [`TimelineFocus`] for other options.
71 pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
72 self.focus = focus;
73 self
74 }
75
76 /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
77 /// we're building.
78 ///
79 /// If it was previously set before, will overwrite the previous one.
80 pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
81 self.unable_to_decrypt_hook = Some(hook);
82 self
83 }
84
85 /// Sets the internal id prefix for this timeline.
86 ///
87 /// The prefix will be prepended to any internal ID using when generating
88 /// timeline IDs for this timeline.
89 pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
90 self.internal_id_prefix = Some(prefix);
91 self
92 }
93
94 /// Chose when to insert the date separators, either in between each day
95 /// or each month.
96 pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
97 self.settings.date_divider_mode = mode;
98 self
99 }
100
101 /// Enable tracking of the fully-read marker and the read receipts on the
102 /// timeline.
103 pub fn track_read_marker_and_receipts(mut self) -> Self {
104 self.settings.track_read_receipts = true;
105 self
106 }
107
108 /// Use the given filter to choose whether to add events to the timeline.
109 ///
110 /// # Arguments
111 ///
112 /// * `filter` - A function that takes a deserialized event, and should
113 /// return `true` if the event should be added to the `Timeline`.
114 ///
115 /// If this is not overridden, the timeline uses the default filter that
116 /// only allows events that are materialized into a `Timeline` item. For
117 /// instance, reactions and edits don't get their own timeline item (as
118 /// they affect another existing one), so they're "filtered out" to
119 /// reflect that.
120 ///
121 /// You can use the default event filter with
122 /// [`crate::timeline::default_event_filter`] so as to chain it with
123 /// your own event filter, if you want to avoid situations where a read
124 /// receipt would be attached to an event that doesn't get its own
125 /// timeline item.
126 ///
127 /// Note that currently:
128 ///
129 /// - Not all event types have a representation as a `TimelineItem` so these
130 /// are not added no matter what the filter returns.
131 /// - It is not possible to filter out `m.room.encrypted` events (otherwise
132 /// they couldn't be decrypted when the appropriate room key arrives).
133 pub fn event_filter<F>(mut self, filter: F) -> Self
134 where
135 F: Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool
136 + SendOutsideWasm
137 + SyncOutsideWasm
138 + 'static,
139 {
140 self.settings.event_filter = Arc::new(filter);
141 self
142 }
143
144 /// Whether to add events that failed to deserialize to the timeline.
145 ///
146 /// Defaults to `true`.
147 pub fn add_failed_to_parse(mut self, add: bool) -> Self {
148 self.settings.add_failed_to_parse = add;
149 self
150 }
151
152 /// Create a [`Timeline`] with the options set on this builder.
153 #[tracing::instrument(
154 skip(self),
155 fields(
156 room_id = ?self.room.room_id(),
157 track_read_receipts = self.settings.track_read_receipts,
158 )
159 )]
160 pub async fn build(self) -> Result<Timeline, Error> {
161 let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
162
163 // Subscribe the event cache to sync responses, in case we hadn't done it yet.
164 room.client().event_cache().subscribe()?;
165
166 let (room_event_cache, event_cache_drop) = room.event_cache().await?;
167 let (_, event_subscriber) = room_event_cache.subscribe().await;
168
169 let is_room_encrypted = room
170 .latest_encryption_state()
171 .await
172 .map(|state| state.is_encrypted())
173 .ok()
174 .unwrap_or_default();
175
176 let controller = TimelineController::new(
177 room.clone(),
178 focus.clone(),
179 internal_id_prefix.clone(),
180 unable_to_decrypt_hook,
181 is_room_encrypted,
182 settings,
183 );
184
185 let has_events = controller.init_focus(&focus, &room_event_cache).await?;
186
187 let pinned_events_join_handle = if matches!(focus, TimelineFocus::PinnedEvents { .. }) {
188 Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone())))
189 } else {
190 None
191 };
192
193 let room_update_join_handle = spawn({
194 let span = info_span!(
195 parent: Span::none(),
196 "live_update_handler",
197 room_id = ?room.room_id(),
198 focus = focus.debug_string(),
199 prefix = internal_id_prefix
200 );
201 span.follows_from(Span::current());
202
203 room_event_cache_updates_task(
204 room_event_cache.clone(),
205 controller.clone(),
206 event_subscriber,
207 focus.clone(),
208 )
209 .instrument(span)
210 });
211
212 let thread_update_join_handle = if let TimelineFocus::Thread { root_event_id: root } =
213 &focus
214 {
215 Some({
216 let span = info_span!(
217 parent: Span::none(),
218 "thread_live_update_handler",
219 room_id = ?room.room_id(),
220 focus = focus.debug_string(),
221 prefix = internal_id_prefix
222 );
223 span.follows_from(Span::current());
224
225 // Note: must be done here *before* spawning the task, to avoid race conditions
226 // with event cache updates happening in the background.
227 let (_events, receiver) = room_event_cache.subscribe_to_thread(root.clone()).await;
228
229 spawn(
230 thread_updates_task(
231 receiver,
232 room_event_cache.clone(),
233 controller.clone(),
234 root.clone(),
235 )
236 .instrument(span),
237 )
238 })
239 } else {
240 None
241 };
242
243 let local_echo_listener_handle = {
244 let timeline_controller = controller.clone();
245 let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
246
247 spawn({
248 // Handles existing local echoes first.
249 for echo in local_echoes {
250 timeline_controller.handle_local_echo(echo).await;
251 }
252
253 let span = info_span!(
254 parent: Span::none(),
255 "local_echo_handler",
256 room_id = ?room.room_id(),
257 focus = focus.debug_string(),
258 prefix = internal_id_prefix
259 );
260 span.follows_from(Span::current());
261
262 room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
263 })
264 };
265
266 let crypto_drop_handles = spawn_crypto_tasks(room, controller.clone()).await;
267
268 let timeline = Timeline {
269 controller,
270 event_cache: room_event_cache,
271 drop_handle: Arc::new(TimelineDropHandle {
272 _crypto_drop_handles: crypto_drop_handles,
273 room_update_join_handle,
274 thread_update_join_handle,
275 pinned_events_join_handle,
276 local_echo_listener_handle,
277 _event_cache_drop_handle: event_cache_drop,
278 }),
279 };
280
281 if has_events {
282 // The events we're injecting might be encrypted events, but we might
283 // have received the room key to decrypt them while nobody was listening to the
284 // `m.room_key` event, let's retry now.
285 timeline.retry_decryption_for_all_events().await;
286 }
287
288 Ok(timeline)
289 }
290}