1use std::sync::Arc;
16
17use matrix_sdk::Room;
18use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm};
19use ruma::{events::AnySyncTimelineEvent, room_version_rules::RoomVersionRules};
20use tracing::{Instrument, Span, info_span};
21
22use super::{
23 DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
24 controller::{TimelineController, TimelineSettings},
25};
26use crate::{
27 timeline::{
28 TimelineReadReceiptTracking,
29 controller::spawn_crypto_tasks,
30 tasks::{
31 pinned_events_task, room_event_cache_updates_task, room_send_queue_update_task,
32 thread_updates_task,
33 },
34 },
35 unable_to_decrypt_hook::UtdHookManager,
36};
37
38#[must_use]
41#[derive(Debug)]
42pub struct TimelineBuilder {
43 room: Room,
44 settings: TimelineSettings,
45 focus: TimelineFocus,
46
47 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
50
51 internal_id_prefix: Option<String>,
53}
54
55impl TimelineBuilder {
56 pub fn new(room: &Room) -> Self {
57 Self {
58 room: room.clone(),
59 settings: TimelineSettings::default(),
60 unable_to_decrypt_hook: None,
61 focus: TimelineFocus::Live { hide_threaded_events: false },
62 internal_id_prefix: None,
63 }
64 }
65
66 pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
73 self.focus = focus;
74 self
75 }
76
77 pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
82 self.unable_to_decrypt_hook = Some(hook);
83 self
84 }
85
86 pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
91 self.internal_id_prefix = Some(prefix);
92 self
93 }
94
95 pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
98 self.settings.date_divider_mode = mode;
99 self
100 }
101
102 pub fn track_read_marker_and_receipts(mut self, tracking: TimelineReadReceiptTracking) -> Self {
105 self.settings.track_read_receipts = tracking;
106 self
107 }
108
109 pub fn event_filter<F>(mut self, filter: F) -> Self
135 where
136 F: Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool
137 + SendOutsideWasm
138 + SyncOutsideWasm
139 + 'static,
140 {
141 self.settings.event_filter = Arc::new(filter);
142 self
143 }
144
145 pub fn add_failed_to_parse(mut self, add: bool) -> Self {
149 self.settings.add_failed_to_parse = add;
150 self
151 }
152
153 #[tracing::instrument(
155 skip(self),
156 fields(
157 room_id = ?self.room.room_id(),
158 track_read_receipts = ?self.settings.track_read_receipts,
159 )
160 )]
161 pub async fn build(self) -> Result<Timeline, Error> {
162 let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
163
164 room.client().event_cache().subscribe()?;
166
167 let (room_event_cache, event_cache_drop) = room.event_cache().await?;
168 let (_, event_subscriber) = room_event_cache.subscribe().await?;
169
170 let is_room_encrypted = room
171 .latest_encryption_state()
172 .await
173 .map(|state| state.is_encrypted())
174 .ok()
175 .unwrap_or_default();
176
177 let controller = TimelineController::new(
178 room.clone(),
179 focus.clone(),
180 internal_id_prefix.clone(),
181 unable_to_decrypt_hook,
182 is_room_encrypted,
183 settings,
184 );
185
186 let has_events = controller.init_focus(&focus, &room_event_cache).await?;
187
188 let pinned_events_join_handle = if matches!(focus, TimelineFocus::PinnedEvents) {
189 let (_initial_events, pinned_events_recv) =
190 room_event_cache.subscribe_to_pinned_events().await?;
191
192 Some(
193 room.client()
194 .task_monitor()
195 .spawn_background_task(
196 "timeline::pinned_event_cache_updates",
197 pinned_events_task(
198 room_event_cache.clone(),
199 controller.clone(),
200 pinned_events_recv,
201 ),
202 )
203 .abort_on_drop(),
204 )
205 } else {
206 None
207 };
208
209 let room_update_join_handle = room
210 .client()
211 .task_monitor()
212 .spawn_background_task("timeline::room_event_cache_updates", {
213 let span = info_span!(
214 parent: Span::none(),
215 "live_update_handler",
216 room_id = ?room.room_id(),
217 focus = focus.debug_string(),
218 prefix = internal_id_prefix
219 );
220 span.follows_from(Span::current());
221
222 room_event_cache_updates_task(
223 room_event_cache.clone(),
224 controller.clone(),
225 event_subscriber,
226 focus.clone(),
227 )
228 .instrument(span)
229 })
230 .abort_on_drop();
231
232 let thread_update_join_handle =
233 if let TimelineFocus::Thread { root_event_id: root } = &focus {
234 Some({
235 let span = info_span!(
236 parent: Span::none(),
237 "thread_live_update_handler",
238 room_id = ?room.room_id(),
239 focus = focus.debug_string(),
240 prefix = internal_id_prefix
241 );
242 span.follows_from(Span::current());
243
244 let (_events, receiver) =
247 room_event_cache.subscribe_to_thread(root.clone()).await?;
248
249 room.client()
250 .task_monitor()
251 .spawn_background_task(
252 "timeline::thread_event_cache_updates",
253 thread_updates_task(
254 receiver,
255 room_event_cache.clone(),
256 controller.clone(),
257 root.clone(),
258 )
259 .instrument(span),
260 )
261 .abort_on_drop()
262 })
263 } else {
264 None
265 };
266
267 let local_echo_listener_handle = {
268 let timeline_controller = controller.clone();
269 let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
270
271 room.client().task_monitor().spawn_background_task("timeline::local_echo_listener", {
272 for echo in local_echoes {
274 timeline_controller.handle_local_echo(echo).await;
275 }
276
277 let span = info_span!(
278 parent: Span::none(),
279 "local_echo_handler",
280 room_id = ?room.room_id(),
281 focus = focus.debug_string(),
282 prefix = internal_id_prefix
283 );
284 span.follows_from(Span::current());
285
286 room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
287 })
288 };
289
290 let crypto_drop_handles = spawn_crypto_tasks(controller.clone()).await;
291
292 let timeline = Timeline {
293 controller,
294 event_cache: room_event_cache,
295 drop_handle: Arc::new(TimelineDropHandle {
296 _crypto_drop_handles: crypto_drop_handles,
297 _room_update_join_handle: room_update_join_handle,
298 _thread_update_join_handle: thread_update_join_handle,
299 _pinned_events_join_handle: pinned_events_join_handle,
300 _local_echo_listener_handle: local_echo_listener_handle,
301 _event_cache_drop_handle: event_cache_drop,
302 }),
303 };
304
305 if has_events {
306 timeline.retry_decryption_for_all_events().await;
310 }
311
312 Ok(timeline)
313 }
314}