1use std::sync::Arc;
33
34use matrix_sdk_base::{
35 deserialized_responses::TimelineEvent,
36 event_cache::{Event, Gap},
37 linked_chunk::OwnedLinkedChunkId,
38};
39use matrix_sdk_common::{linked_chunk::ChunkIdentifier, serde_helpers::extract_thread_root};
40use ruma::{OwnedEventId, UInt, api::Direction};
41use tokio::sync::{
42 RwLock,
43 broadcast::{Receiver, Sender},
44};
45use tracing::{instrument, trace};
46
47#[cfg(feature = "e2e-encryption")]
48use crate::event_cache::redecryptor::ResolvedUtd;
49use crate::{
50 Room,
51 event_cache::{
52 EventCacheError, EventsOrigin, Result, RoomEventCacheLinkedChunkUpdate,
53 caches::{TimelineVectorDiffs, event_linked_chunk::EventLinkedChunk},
54 },
55 paginators::{PaginationResult, Paginator, StartFromResult, thread::PaginableThread},
56 room::{IncludeRelations, MessagesOptions, RelationsOptions, WeakRoom},
57};
58
59#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
62pub enum EventFocusThreadMode {
63 ForceThread,
71
72 Automatic,
77}
78
79#[derive(Debug, Clone)]
81pub(crate) enum EventFocusedPaginationMode {
82 Room,
85
86 Thread {
88 thread_root: OwnedEventId,
90 },
91}
92
93struct EventFocusedCacheInner {
94 room: WeakRoom,
96
97 focused_event_id: OwnedEventId,
99
100 pagination_mode: EventFocusedPaginationMode,
102
103 chunk: EventLinkedChunk,
105
106 sender: Sender<TimelineVectorDiffs>,
108
109 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
111}
112
113impl EventFocusedCacheInner {
114 #[instrument(skip(self, room), fields(room_id = %self.room.room_id(), event_id = %self.focused_event_id))]
125 async fn start_from(
126 &mut self,
127 room: Room,
128 num_context_events: u16,
129 thread_mode: EventFocusThreadMode,
130 ) -> Result<StartFromResult> {
131 trace!(num_context_events, "fetching event with context via /context");
132
133 let paginator = Paginator::new(room);
134
135 let result =
136 paginator.start_from(&self.focused_event_id, UInt::from(num_context_events)).await?;
137
138 let thread_root = match thread_mode {
140 EventFocusThreadMode::ForceThread => {
141 let focused_event = result
143 .events
144 .iter()
145 .find(|event| event.event_id().as_ref() == Some(&self.focused_event_id));
146
147 let mut thread_root =
149 focused_event.and_then(|event| extract_thread_root(event.raw()));
150
151 if thread_root.is_none() {
154 thread_root = Some(self.focused_event_id.clone());
155 }
156
157 trace!("force thread mode enabled, treating focused event as thread root");
158 thread_root
159 }
160
161 EventFocusThreadMode::Automatic => {
162 trace!(
163 "automatic thread mode enabled, checking if focused event is part of a thread"
164 );
165 result
166 .events
167 .iter()
168 .find(|event| event.event_id().as_ref() == Some(&self.focused_event_id))
169 .and_then(|event| extract_thread_root(event.raw()))
170 }
171 };
172
173 let tokens = paginator.tokens();
175
176 if let Some(root_id) = thread_root {
177 trace!(thread_root = %root_id, "focused event is part of a thread, setting up thread pagination");
178
179 let includes_root =
183 result.events.iter().any(|event| event.event_id().as_ref() == Some(&root_id));
184
185 self.pagination_mode =
186 EventFocusedPaginationMode::Thread { thread_root: root_id.clone() };
187
188 let thread_events = result
190 .events
191 .iter()
192 .filter(|event| {
193 extract_thread_root(event.raw()).as_ref() == Some(&root_id)
194 || event.event_id().as_ref() == Some(&root_id)
195 })
196 .cloned()
197 .collect();
198
199 let backward_token = if includes_root {
201 None
203 } else {
204 tokens.previous.into_token()
205 };
206
207 let forward_token = tokens.next.into_token();
209
210 self.add_initial_events_with_gaps(thread_events, backward_token, forward_token);
211 } else {
212 trace!("focused event is not part of a thread, setting up room pagination");
213 self.pagination_mode = EventFocusedPaginationMode::Room;
214
215 let backward_token = tokens.previous.into_token();
216 let forward_token = tokens.next.into_token();
217
218 self.add_initial_events_with_gaps(result.events.clone(), backward_token, forward_token);
219 }
220
221 self.propagate_changes();
222
223 let _ = self.chunk.updates_as_vector_diffs();
231
232 Ok(result)
233 }
234
235 fn add_initial_events_with_gaps(
237 &mut self,
238 events: Vec<TimelineEvent>,
239 prev_gap_token: Option<String>,
240 next_gap_token: Option<String>,
241 ) {
242 self.chunk
245 .push_live_events(prev_gap_token.map(|prev_token| Gap { token: prev_token }), &events);
246
247 if let Some(next_token) = next_gap_token {
249 trace!("inserting forward pagination gap at back");
250 self.chunk.push_gap(Gap { token: next_token });
251 }
252 }
253
254 fn propagate_changes(&mut self) {
256 let updates = self.chunk.store_updates().take();
257 if !updates.is_empty() {
258 let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
259 updates,
260 linked_chunk_id: OwnedLinkedChunkId::EventFocused(
261 self.room.room_id().to_owned(),
262 self.focused_event_id.clone(),
263 ),
264 });
265 }
266 }
267
268 fn notify_subscribers(&mut self, origin: EventsOrigin) {
270 let diffs = self.chunk.updates_as_vector_diffs();
271 if !diffs.is_empty() {
272 let _ = self.sender.send(TimelineVectorDiffs { diffs, origin });
273 }
274 }
275
276 fn first_chunk_as_gap(&self) -> Option<(ChunkIdentifier, Gap)> {
278 self.chunk.first_chunk_as_gap()
279 }
280
281 fn last_chunk_as_gap(&self) -> Option<(ChunkIdentifier, Gap)> {
283 self.chunk.last_chunk_as_gap()
284 }
285
286 #[instrument(skip(self), fields(room_id = %self.room.room_id()))]
292 async fn paginate_backwards(&mut self, num_events: u16) -> Result<PaginationResult> {
293 let room = self.room.get().ok_or(EventCacheError::ClientDropped)?;
294
295 let Some((gap_id, gap)) = self.first_chunk_as_gap() else {
297 trace!("no front gap found, already at timeline start");
299 return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true });
300 };
301
302 let token = gap.token;
303 trace!(?token, "paginating backwards with token from front gap");
304
305 let (mut events, new_token) = match &self.pagination_mode {
307 EventFocusedPaginationMode::Room => {
308 Self::fetch_room_backwards(&room, num_events, &token).await?
309 }
310 EventFocusedPaginationMode::Thread { thread_root } => {
311 Self::fetch_thread_backwards(&room, num_events, &token, thread_root.clone()).await?
312 }
313 };
314
315 events.reverse();
318
319 let hit_end = new_token.is_none();
320 let new_gap = new_token.map(|t| Gap { token: t });
321
322 self.chunk.push_backwards_pagination_events(Some(gap_id), new_gap, &events);
324
325 self.propagate_changes();
326 self.notify_subscribers(EventsOrigin::Pagination);
327
328 Ok(PaginationResult { events, hit_end_of_timeline: hit_end })
329 }
330
331 async fn fetch_room_backwards(
337 room: &Room,
338 num_events: u16,
339 token: &str,
340 ) -> Result<(Vec<Event>, Option<String>)> {
341 let mut options = MessagesOptions::backward().from(token);
342 options.limit = UInt::from(num_events);
343
344 let messages = room
345 .messages(options)
346 .await
347 .map_err(|err| EventCacheError::PaginationError(Box::new(err)))?;
348
349 Ok((messages.chunk, messages.end))
350 }
351
352 async fn fetch_thread_backwards(
357 room: &Room,
358 num_events: u16,
359 token: &str,
360 thread_root: OwnedEventId,
361 ) -> Result<(Vec<Event>, Option<String>)> {
362 let options = RelationsOptions {
363 from: Some(token.to_owned()),
364 dir: Direction::Backward,
365 limit: Some(UInt::from(num_events)),
366 include_relations: IncludeRelations::AllRelations,
367 recurse: true,
368 };
369
370 let mut result = room
371 .relations(thread_root.clone(), options)
372 .await
373 .map_err(|err| EventCacheError::PaginationError(Box::new(err)))?;
374
375 if result.next_batch_token.is_none() {
377 let root_event = room
378 .load_event(&thread_root)
379 .await
380 .map_err(|err| EventCacheError::PaginationError(Box::new(err)))?;
381 result.chunk.push(root_event);
382 }
383
384 Ok((result.chunk, result.next_batch_token))
385 }
386
387 #[instrument(skip(self), fields(room_id = %self.room.room_id()))]
393 async fn paginate_forwards(&mut self, num_events: u16) -> Result<PaginationResult> {
394 let room = self.room.get().ok_or(EventCacheError::ClientDropped)?;
395
396 let Some((gap_id, gap)) = self.last_chunk_as_gap() else {
398 trace!("no back gap found, already at timeline end");
400 return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true });
401 };
402
403 let token = gap.token;
404 trace!(?token, "paginating forwards with token from back gap");
405
406 let (events, new_token) = match &self.pagination_mode {
408 EventFocusedPaginationMode::Room => {
409 Self::fetch_room_forwards(&room, num_events, &token).await?
410 }
411 EventFocusedPaginationMode::Thread { thread_root } => {
412 Self::fetch_thread_forwards(&room, num_events, &token, thread_root.clone()).await?
413 }
414 };
415
416 let hit_end = new_token.is_none();
417 let new_gap = new_token.map(|t| Gap { token: t });
418
419 self.chunk.push_forwards_pagination_events(Some(gap_id), new_gap, &events);
421
422 self.propagate_changes();
423 self.notify_subscribers(EventsOrigin::Pagination);
424
425 Ok(PaginationResult { events, hit_end_of_timeline: hit_end })
426 }
427
428 async fn fetch_room_forwards(
430 room: &Room,
431 num_events: u16,
432 token: &str,
433 ) -> Result<(Vec<Event>, Option<String>)> {
434 let mut options = MessagesOptions::new(Direction::Forward);
435 options = options.from(Some(token));
436 options.limit = UInt::from(num_events);
437
438 let messages = room
439 .messages(options)
440 .await
441 .map_err(|err| EventCacheError::PaginationError(Box::new(err)))?;
442
443 Ok((messages.chunk, messages.end))
444 }
445
446 async fn fetch_thread_forwards(
448 room: &Room,
449 num_events: u16,
450 token: &str,
451 thread_root: OwnedEventId,
452 ) -> Result<(Vec<Event>, Option<String>)> {
453 let options = RelationsOptions {
454 from: Some(token.to_owned()),
455 dir: Direction::Forward,
456 limit: Some(UInt::from(num_events)),
457 include_relations: IncludeRelations::AllRelations,
458 recurse: true,
459 };
460
461 let result = room
462 .relations(thread_root, options)
463 .await
464 .map_err(|err| EventCacheError::PaginationError(Box::new(err)))?;
465
466 Ok((result.chunk, result.next_batch_token))
467 }
468}
469
470#[derive(Clone)]
485pub struct EventFocusedCache {
486 inner: Arc<RwLock<EventFocusedCacheInner>>,
487}
488
489impl EventFocusedCache {
490 pub(super) fn new(
492 room: WeakRoom,
493 focused_event_id: OwnedEventId,
494 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
495 ) -> Self {
496 Self {
497 inner: Arc::new(RwLock::new(EventFocusedCacheInner {
498 room,
499 focused_event_id,
500 pagination_mode: EventFocusedPaginationMode::Room,
501 chunk: EventLinkedChunk::new(),
502 sender: Sender::new(32),
503 linked_chunk_update_sender,
504 })),
505 }
506 }
507
508 pub async fn subscribe(&self) -> (Vec<Event>, Receiver<TimelineVectorDiffs>) {
510 let inner = self.inner.read().await;
511 let events = inner.chunk.events().map(|(_position, item)| item.clone()).collect();
512 let recv = inner.sender.subscribe();
513 (events, recv)
514 }
515
516 pub async fn hit_timeline_start(&self) -> bool {
519 self.inner.read().await.first_chunk_as_gap().is_none()
520 }
521
522 pub async fn hit_timeline_end(&self) -> bool {
525 self.inner.read().await.last_chunk_as_gap().is_none()
526 }
527
528 pub(super) async fn start_from(
531 &self,
532 room: Room,
533 num_context_events: u16,
534 thread_mode: EventFocusThreadMode,
535 ) -> Result<StartFromResult> {
536 self.inner.write().await.start_from(room, num_context_events, thread_mode).await
537 }
538
539 pub async fn paginate_backwards(&self, num_events: u16) -> Result<PaginationResult> {
542 self.inner.write().await.paginate_backwards(num_events).await
543 }
544
545 pub async fn paginate_forwards(&self, num_events: u16) -> Result<PaginationResult> {
548 self.inner.write().await.paginate_forwards(num_events).await
549 }
550
551 pub async fn thread_root(&self) -> Option<OwnedEventId> {
553 match &self.inner.read().await.pagination_mode {
554 EventFocusedPaginationMode::Thread { thread_root } => Some(thread_root.clone()),
555 _ => None,
556 }
557 }
558
559 #[cfg(feature = "e2e-encryption")]
563 pub async fn replace_utds(&self, events: &[ResolvedUtd]) {
564 let mut guard = self.inner.write().await;
565 if guard.chunk.replace_utds(events) {
566 guard.propagate_changes();
567 guard.notify_subscribers(EventsOrigin::Cache);
568 }
569 }
570}
571
572#[cfg(not(tarpaulin_include))]
573impl std::fmt::Debug for EventFocusedCache {
574 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
575 f.debug_struct("EventFocusedCache").finish_non_exhaustive()
576 }
577}