matrix_sdk_ui/timeline/builder.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, sync::Arc};
16
17use futures_util::{pin_mut, StreamExt};
18use matrix_sdk::{
19 encryption::backups::BackupState,
20 event_cache::{EventsOrigin, RoomEventCacheUpdate},
21 executor::spawn,
22 Room,
23};
24use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
25use tokio::sync::broadcast::error::RecvError;
26use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
27use tracing::{info, info_span, trace, warn, Instrument, Span};
28
29use super::{
30 controller::{TimelineController, TimelineSettings},
31 to_device::{handle_forwarded_room_key_event, handle_room_key_event},
32 DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
33};
34use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
35
36/// Builder that allows creating and configuring various parts of a
37/// [`Timeline`].
38#[must_use]
39#[derive(Debug)]
40pub struct TimelineBuilder {
41 room: Room,
42 settings: TimelineSettings,
43 focus: TimelineFocus,
44
45 /// An optional hook to call whenever we run into an unable-to-decrypt or a
46 /// late-decryption event.
47 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
48
49 /// An optional prefix for internal IDs.
50 internal_id_prefix: Option<String>,
51}
52
53impl TimelineBuilder {
54 pub(super) fn new(room: &Room) -> Self {
55 Self {
56 room: room.clone(),
57 settings: TimelineSettings::default(),
58 unable_to_decrypt_hook: None,
59 focus: TimelineFocus::Live,
60 internal_id_prefix: None,
61 }
62 }
63
64 /// Sets up the initial focus for this timeline.
65 ///
66 /// This can be changed later on while the timeline is alive.
67 pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
68 self.focus = focus;
69 self
70 }
71
72 /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
73 /// we're building.
74 ///
75 /// If it was previously set before, will overwrite the previous one.
76 pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
77 self.unable_to_decrypt_hook = Some(hook);
78 self
79 }
80
81 /// Sets the internal id prefix for this timeline.
82 ///
83 /// The prefix will be prepended to any internal ID using when generating
84 /// timeline IDs for this timeline.
85 pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
86 self.internal_id_prefix = Some(prefix);
87 self
88 }
89
90 /// Chose when to insert the date separators, either in between each day
91 /// or each month.
92 pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
93 self.settings.date_divider_mode = mode;
94 self
95 }
96
97 /// Enable tracking of the fully-read marker and the read receipts on the
98 /// timeline.
99 pub fn track_read_marker_and_receipts(mut self) -> Self {
100 self.settings.track_read_receipts = true;
101 self
102 }
103
104 /// Use the given filter to choose whether to add events to the timeline.
105 ///
106 /// # Arguments
107 ///
108 /// * `filter` - A function that takes a deserialized event, and should
109 /// return `true` if the event should be added to the `Timeline`.
110 ///
111 /// If this is not overridden, the timeline uses the default filter that
112 /// only allows events that are materialized into a `Timeline` item. For
113 /// instance, reactions and edits don't get their own timeline item (as
114 /// they affect another existing one), so they're "filtered out" to
115 /// reflect that.
116 ///
117 /// You can use the default event filter with
118 /// [`crate::timeline::default_event_filter`] so as to chain it with
119 /// your own event filter, if you want to avoid situations where a read
120 /// receipt would be attached to an event that doesn't get its own
121 /// timeline item.
122 ///
123 /// Note that currently:
124 ///
125 /// - Not all event types have a representation as a `TimelineItem` so these
126 /// are not added no matter what the filter returns.
127 /// - It is not possible to filter out `m.room.encrypted` events (otherwise
128 /// they couldn't be decrypted when the appropriate room key arrives).
129 pub fn event_filter<F>(mut self, filter: F) -> Self
130 where
131 F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync + 'static,
132 {
133 self.settings.event_filter = Arc::new(filter);
134 self
135 }
136
137 /// Whether to add events that failed to deserialize to the timeline.
138 ///
139 /// Defaults to `true`.
140 pub fn add_failed_to_parse(mut self, add: bool) -> Self {
141 self.settings.add_failed_to_parse = add;
142 self
143 }
144
145 /// Create a [`Timeline`] with the options set on this builder.
146 #[tracing::instrument(
147 skip(self),
148 fields(
149 room_id = ?self.room.room_id(),
150 track_read_receipts = self.settings.track_read_receipts,
151 )
152 )]
153 pub async fn build(self) -> Result<Timeline, Error> {
154 let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
155
156 let client = room.client();
157 let event_cache = client.event_cache();
158
159 // Subscribe the event cache to sync responses, in case we hadn't done it yet.
160 event_cache.subscribe()?;
161
162 let (room_event_cache, event_cache_drop) = room.event_cache().await?;
163 let (_, mut event_subscriber) = room_event_cache.subscribe().await;
164
165 let is_live = matches!(focus, TimelineFocus::Live);
166 let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. });
167 let is_room_encrypted = room.is_encrypted().await.ok().unwrap_or_default();
168
169 let controller = TimelineController::new(
170 room,
171 focus.clone(),
172 internal_id_prefix.clone(),
173 unable_to_decrypt_hook,
174 is_room_encrypted,
175 )
176 .with_settings(settings);
177
178 let has_events = controller.init_focus(&room_event_cache).await?;
179
180 let room = controller.room();
181 let client = room.client();
182
183 let pinned_events_join_handle = if is_pinned_events {
184 let mut pinned_event_ids_stream = room.pinned_event_ids_stream();
185 Some(spawn({
186 let inner = controller.clone();
187 async move {
188 while pinned_event_ids_stream.next().await.is_some() {
189 if let Ok(events) = inner.reload_pinned_events().await {
190 inner
191 .replace_with_initial_remote_events(
192 events.into_iter(),
193 RemoteEventOrigin::Pagination,
194 )
195 .await;
196 }
197 }
198 }
199 }))
200 } else {
201 None
202 };
203
204 let encryption_changes_handle = spawn({
205 let inner = controller.clone();
206 async move {
207 inner.handle_encryption_state_changes().await;
208 }
209 });
210
211 let room_update_join_handle = spawn({
212 let room_event_cache = room_event_cache.clone();
213 let inner = controller.clone();
214
215 let span = info_span!(
216 parent: Span::none(),
217 "live_update_handler",
218 room_id = ?room.room_id(),
219 focus = focus.debug_string(),
220 prefix = internal_id_prefix
221 );
222 span.follows_from(Span::current());
223
224 async move {
225 trace!("Spawned the event subscriber task.");
226
227 loop {
228 trace!("Waiting for an event.");
229
230 let update = match event_subscriber.recv().await {
231 Ok(up) => up,
232 Err(RecvError::Closed) => break,
233 Err(RecvError::Lagged(num_skipped)) => {
234 warn!(
235 num_skipped,
236 "Lagged behind event cache updates, resetting timeline"
237 );
238
239 // The updates might have lagged, but the room event cache might have
240 // events, so retrieve them and add them back again to the timeline,
241 // after clearing it.
242 //
243 // If we can't get a handle on the room cache's events, just clear the
244 // current timeline.
245 let (initial_events, _stream) = room_event_cache.subscribe().await;
246
247 inner
248 .replace_with_initial_remote_events(
249 initial_events.into_iter(),
250 RemoteEventOrigin::Sync,
251 )
252 .await;
253
254 continue;
255 }
256 };
257
258 match update {
259 RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
260 trace!(target = %event_id, "Handling fully read marker.");
261 inner.handle_fully_read_marker(event_id).await;
262 }
263
264 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
265 trace!("Received new timeline events diffs");
266
267 // We shouldn't use the general way of adding events to timelines to
268 // non-live timelines, such as pinned events or focused timeline.
269 // These timelines should handle any live updates by themselves.
270 if !is_live {
271 continue;
272 }
273
274 inner
275 .handle_remote_events_with_diffs(
276 diffs,
277 match origin {
278 EventsOrigin::Sync => RemoteEventOrigin::Sync,
279 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
280 },
281 )
282 .await;
283 }
284
285 RoomEventCacheUpdate::AddEphemeralEvents { events } => {
286 trace!("Received new ephemeral events from sync.");
287
288 // TODO: (bnjbvr) ephemeral should be handled by the event cache.
289 inner.handle_ephemeral_events(events).await;
290 }
291
292 RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
293 if !ambiguity_changes.is_empty() {
294 let member_ambiguity_changes = ambiguity_changes
295 .values()
296 .flat_map(|change| change.user_ids())
297 .collect::<BTreeSet<_>>();
298 inner.force_update_sender_profiles(&member_ambiguity_changes).await;
299 }
300 }
301 }
302 }
303 }
304 .instrument(span)
305 });
306
307 let local_echo_listener_handle = {
308 let timeline = controller.clone();
309 let (local_echoes, mut listener) = room.send_queue().subscribe().await?;
310
311 spawn({
312 // Handles existing local echoes first.
313 for echo in local_echoes {
314 timeline.handle_local_echo(echo).await;
315 }
316
317 let span = info_span!(
318 parent: Span::none(),
319 "local_echo_handler",
320 room_id = ?room.room_id(),
321 focus = focus.debug_string(),
322 prefix = internal_id_prefix
323 );
324 span.follows_from(Span::current());
325
326 // React to future local echoes too.
327 async move {
328 info!("spawned the local echo handler!");
329
330 loop {
331 match listener.recv().await {
332 Ok(update) => timeline.handle_room_send_queue_update(update).await,
333
334 Err(RecvError::Lagged(num_missed)) => {
335 warn!("missed {num_missed} local echoes, ignoring those missed");
336 }
337
338 Err(RecvError::Closed) => {
339 info!("channel closed, exiting the local echo handler");
340 break;
341 }
342 }
343 }
344 }
345 .instrument(span)
346 })
347 };
348
349 // Not using room.add_event_handler here because RoomKey events are
350 // to-device events that are not received in the context of a room.
351
352 let room_key_handle = client.add_event_handler(handle_room_key_event(
353 controller.clone(),
354 room.room_id().to_owned(),
355 ));
356
357 let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event(
358 controller.clone(),
359 room.room_id().to_owned(),
360 ));
361
362 let handles = vec![room_key_handle, forwarded_room_key_handle];
363
364 let room_key_from_backups_join_handle = {
365 let inner = controller.clone();
366 let room_id = inner.room().room_id();
367
368 let stream = client.encryption().backups().room_keys_for_room_stream(room_id);
369
370 spawn(async move {
371 pin_mut!(stream);
372
373 while let Some(update) = stream.next().await {
374 let room = inner.room();
375
376 match update {
377 Ok(info) => {
378 let mut session_ids = BTreeSet::new();
379
380 for set in info.into_values() {
381 session_ids.extend(set);
382 }
383
384 inner.retry_event_decryption(room, Some(session_ids)).await;
385 }
386 // We lagged, so retry every event.
387 Err(_) => inner.retry_event_decryption(room, None).await,
388 }
389 }
390 })
391 };
392
393 let room_key_backup_enabled_join_handle = {
394 let inner = controller.clone();
395 let stream = client.encryption().backups().state_stream();
396
397 spawn(async move {
398 pin_mut!(stream);
399
400 while let Some(update) = stream.next().await {
401 match update {
402 // If the backup got enabled, or we lagged and thus missed that the backup
403 // might be enabled, retry to decrypt all the events. Please note, depending
404 // on the backup download strategy, this might do two things under the
405 // assumption that the backup contains the relevant room keys:
406 //
407 // 1. It will decrypt the events, if `BackupDownloadStrategy` has been set
408 // to `OneShot`.
409 // 2. It will fail to decrypt the event, but try to download the room key to
410 // decrypt it if the `BackupDownloadStrategy` has been set to
411 // `AfterDecryptionFailure`.
412 Ok(BackupState::Enabled) | Err(_) => {
413 let room = inner.room();
414 inner.retry_event_decryption(room, None).await;
415 }
416 // The other states aren't interesting since they are either still enabling
417 // the backup or have the backup in the disabled state.
418 Ok(
419 BackupState::Unknown
420 | BackupState::Creating
421 | BackupState::Resuming
422 | BackupState::Disabling
423 | BackupState::Downloading
424 | BackupState::Enabling,
425 ) => (),
426 }
427 }
428 })
429 };
430
431 // TODO: Technically, this should be the only stream we need to listen to get
432 // notified when we should retry to decrypt an event. We sadly can't do that,
433 // since the cross-process support kills the `OlmMachine` which then in
434 // turn kills this stream. Once this is solved remove all the other ways we
435 // listen for room keys.
436 let room_keys_received_join_handle = {
437 let inner = controller.clone();
438 let stream = client.encryption().room_keys_received_stream().await.expect(
439 "We should be logged in by now, so we should have access to an OlmMachine \
440 to be able to listen to this stream",
441 );
442
443 spawn(async move {
444 pin_mut!(stream);
445
446 while let Some(room_keys) = stream.next().await {
447 let session_ids = match room_keys {
448 Ok(room_keys) => {
449 let session_ids: BTreeSet<String> = room_keys
450 .into_iter()
451 .filter(|info| info.room_id == inner.room().room_id())
452 .map(|info| info.session_id)
453 .collect();
454
455 Some(session_ids)
456 }
457 Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
458 // We lagged, let's retry to decrypt anything we have, maybe something
459 // was received.
460 warn!(missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline");
461
462 None
463 }
464 };
465
466 let room = inner.room();
467 inner.retry_event_decryption(room, session_ids).await;
468 }
469 })
470 };
471
472 let timeline = Timeline {
473 controller,
474 event_cache: room_event_cache,
475 drop_handle: Arc::new(TimelineDropHandle {
476 client,
477 event_handler_handles: handles,
478 room_update_join_handle,
479 pinned_events_join_handle,
480 room_key_from_backups_join_handle,
481 room_key_backup_enabled_join_handle,
482 room_keys_received_join_handle,
483 local_echo_listener_handle,
484 _event_cache_drop_handle: event_cache_drop,
485 encryption_changes_handle,
486 }),
487 };
488
489 if has_events {
490 // The events we're injecting might be encrypted events, but we might
491 // have received the room key to decrypt them while nobody was listening to the
492 // `m.room_key` event, let's retry now.
493 timeline.retry_decryption_for_all_events().await;
494 }
495
496 Ok(timeline)
497 }
498}