1use std::sync::Arc;
16
17use matrix_sdk::Room;
18use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm};
19use ruma::{events::AnySyncTimelineEvent, room_version_rules::RoomVersionRules};
20use tracing::{Instrument, Span, info_span};
21
22use super::{
23 DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
24 controller::{TimelineController, TimelineSettings},
25};
26use crate::{
27 timeline::{
28 PaginationError, TimelineReadReceiptTracking,
29 controller::spawn_crypto_tasks,
30 tasks::{
31 event_focused_task, pinned_events_task, room_event_cache_updates_task,
32 room_send_queue_update_task, thread_updates_task,
33 },
34 },
35 unable_to_decrypt_hook::UtdHookManager,
36};
37
38#[must_use]
41#[derive(Debug)]
42pub struct TimelineBuilder {
43 room: Room,
44 settings: TimelineSettings,
45 focus: TimelineFocus,
46
47 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
50
51 internal_id_prefix: Option<String>,
53}
54
55impl TimelineBuilder {
56 pub fn new(room: &Room) -> Self {
57 Self {
58 room: room.clone(),
59 settings: TimelineSettings::default(),
60 unable_to_decrypt_hook: None,
61 focus: TimelineFocus::Live { hide_threaded_events: false },
62 internal_id_prefix: None,
63 }
64 }
65
66 pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
73 self.focus = focus;
74 self
75 }
76
77 pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
82 self.unable_to_decrypt_hook = Some(hook);
83 self
84 }
85
86 pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
91 self.internal_id_prefix = Some(prefix);
92 self
93 }
94
95 pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
98 self.settings.date_divider_mode = mode;
99 self
100 }
101
102 pub fn track_read_marker_and_receipts(mut self, tracking: TimelineReadReceiptTracking) -> Self {
105 self.settings.track_read_receipts = tracking;
106 self
107 }
108
109 pub fn event_filter<F>(mut self, filter: F) -> Self
135 where
136 F: Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool
137 + SendOutsideWasm
138 + SyncOutsideWasm
139 + 'static,
140 {
141 self.settings.event_filter = Arc::new(filter);
142 self
143 }
144
145 pub fn add_failed_to_parse(mut self, add: bool) -> Self {
149 self.settings.add_failed_to_parse = add;
150 self
151 }
152
153 #[tracing::instrument(
155 skip(self),
156 fields(
157 room_id = ?self.room.room_id(),
158 track_read_receipts = ?self.settings.track_read_receipts,
159 )
160 )]
161 pub async fn build(self) -> Result<Timeline, Error> {
162 let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
163
164 room.client().event_cache().subscribe()?;
166
167 let (room_event_cache, event_cache_drop) = room.event_cache().await?;
168 let (_, event_subscriber) = room_event_cache.subscribe().await?;
169
170 let is_room_encrypted = room
171 .latest_encryption_state()
172 .await
173 .map(|state| state.is_encrypted())
174 .ok()
175 .unwrap_or_default();
176
177 let controller = TimelineController::new(
178 room.clone(),
179 focus.clone(),
180 internal_id_prefix.clone(),
181 unable_to_decrypt_hook,
182 is_room_encrypted,
183 settings,
184 );
185
186 let has_events = controller.init_focus(&focus, &room_event_cache).await?;
187
188 let pinned_events_join_handle = if matches!(focus, TimelineFocus::PinnedEvents) {
189 let (_initial_events, pinned_events_recv) =
190 room_event_cache.subscribe_to_pinned_events().await?;
191
192 Some(
193 room.client()
194 .task_monitor()
195 .spawn_background_task(
196 "timeline::pinned_event_cache_updates",
197 pinned_events_task(
198 room_event_cache.clone(),
199 controller.clone(),
200 pinned_events_recv,
201 ),
202 )
203 .abort_on_drop(),
204 )
205 } else {
206 None
207 };
208
209 let event_focused_join_handle =
210 if let TimelineFocus::Event { target, thread_mode, .. } = &focus {
211 let cache = room_event_cache
212 .get_event_focused_cache(target.clone(), (*thread_mode).into())
213 .await?
214 .ok_or(Error::PaginationError(PaginationError::MissingCache))?;
215
216 let (_initial_events, recv) = cache.subscribe().await;
217
218 Some(room.client().task_monitor().spawn_background_task(
219 "timeline::event_focused_cache_updates",
220 event_focused_task(
221 target.clone(),
222 (*thread_mode).into(),
223 room_event_cache.clone(),
224 controller.clone(),
225 recv,
226 ),
227 ))
228 } else {
229 None
230 };
231
232 let room_update_join_handle = room
233 .client()
234 .task_monitor()
235 .spawn_background_task("timeline::room_event_cache_updates", {
236 let span = info_span!(
237 parent: Span::none(),
238 "live_update_handler",
239 room_id = ?room.room_id(),
240 focus = focus.debug_string(),
241 prefix = internal_id_prefix
242 );
243 span.follows_from(Span::current());
244
245 room_event_cache_updates_task(
246 room_event_cache.clone(),
247 controller.clone(),
248 event_subscriber,
249 focus.clone(),
250 )
251 .instrument(span)
252 })
253 .abort_on_drop();
254
255 let thread_update_join_handle =
256 if let TimelineFocus::Thread { root_event_id: root } = &focus {
257 Some({
258 let span = info_span!(
259 parent: Span::none(),
260 "thread_live_update_handler",
261 room_id = ?room.room_id(),
262 focus = focus.debug_string(),
263 prefix = internal_id_prefix
264 );
265 span.follows_from(Span::current());
266
267 let (_events, receiver) =
270 room_event_cache.subscribe_to_thread(root.clone()).await?;
271
272 room.client()
273 .task_monitor()
274 .spawn_background_task(
275 "timeline::thread_event_cache_updates",
276 thread_updates_task(
277 receiver,
278 room_event_cache.clone(),
279 controller.clone(),
280 root.clone(),
281 )
282 .instrument(span),
283 )
284 .abort_on_drop()
285 })
286 } else {
287 None
288 };
289
290 let local_echo_listener_handle = {
291 let timeline_controller = controller.clone();
292 let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
293
294 room.client().task_monitor().spawn_background_task("timeline::local_echo_listener", {
295 for echo in local_echoes {
297 timeline_controller.handle_local_echo(echo).await;
298 }
299
300 let span = info_span!(
301 parent: Span::none(),
302 "local_echo_handler",
303 room_id = ?room.room_id(),
304 focus = focus.debug_string(),
305 prefix = internal_id_prefix
306 );
307 span.follows_from(Span::current());
308
309 room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
310 })
311 };
312
313 let crypto_drop_handles = spawn_crypto_tasks(controller.clone()).await;
314
315 let timeline = Timeline {
316 controller,
317 event_cache: room_event_cache,
318 drop_handle: Arc::new(TimelineDropHandle {
319 _crypto_drop_handles: crypto_drop_handles,
320 _room_update_join_handle: room_update_join_handle,
321 _thread_update_join_handle: thread_update_join_handle,
322 _pinned_events_join_handle: pinned_events_join_handle,
323 _local_echo_listener_handle: local_echo_listener_handle,
324 _event_focused_join_handle: event_focused_join_handle,
325 _event_cache_drop_handle: event_cache_drop,
326 }),
327 };
328
329 if has_events {
330 timeline.retry_decryption_for_all_events().await;
334 }
335
336 Ok(timeline)
337 }
338}