Skip to main content

matrix_sdk/event_cache/caches/event_focused/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Event-focused timeline cache.
16//!
17//! This module provides [`EventFocusedCache`], a cache for an event-focused
18//! timeline (e.g., for permalinks). It handles:
19//! - Initialization from a focused event via `/context`.
20//! - Thread detection (if the focused event is in a thread).
21//! - Forward and backward pagination.
22//! - In-memory storage, as these linked chunks are meant to be short-lived.
23//!
24//! Pagination tokens are stored as Gap items in the linked chunk:
25//! - Backward token: Gap at the front of the linked chunk.
26//! - Forward token: Gap at the back of the linked chunk.
27//!
28//! This allows pagination to resume at any point, and supports a future use
29//! case where we'd want to persist these caches on disk (e.g., for permalinks
30//! to work across sessions).
31
32use std::sync::Arc;
33
34use matrix_sdk_base::{
35    deserialized_responses::TimelineEvent,
36    event_cache::{Event, Gap},
37    linked_chunk::OwnedLinkedChunkId,
38};
39use matrix_sdk_common::{linked_chunk::ChunkIdentifier, serde_helpers::extract_thread_root};
40use ruma::{OwnedEventId, UInt, api::Direction};
41use tokio::sync::{
42    RwLock,
43    broadcast::{Receiver, Sender},
44};
45use tracing::{instrument, trace};
46
47#[cfg(feature = "e2e-encryption")]
48use crate::event_cache::redecryptor::ResolvedUtd;
49use crate::{
50    Room,
51    event_cache::{
52        EventCacheError, EventsOrigin, Result, RoomEventCacheLinkedChunkUpdate,
53        caches::{TimelineVectorDiffs, event_linked_chunk::EventLinkedChunk},
54    },
55    paginators::{PaginationResult, Paginator, StartFromResult, thread::PaginableThread},
56    room::{IncludeRelations, MessagesOptions, RelationsOptions, WeakRoom},
57};
58
59/// Options for controlling the behaviour of an `EventFocusedCache` when the
60/// focused event may be part of a thread, or a thread's root.
61#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
62pub enum EventFocusThreadMode {
63    /// Force the timeline represented by the linked chunk to only include
64    /// threaded events.
65    ///
66    /// When the focused event is part of a thread, the linked chunk will be
67    /// focused on that thread's root. Otherwise, the linked chunk will
68    /// treat the target event itself as the thread root. Threaded events
69    /// will never be hidden.
70    ForceThread,
71
72    /// Automatically determine if the target event is part of a thread or not.
73    ///
74    /// If the event is part of a thread, the linked chunk will be filtered to
75    /// on-thread events.
76    Automatic,
77}
78
79/// The mode of pagination for an event-focused linked chunk.
80#[derive(Debug, Clone)]
81pub(crate) enum EventFocusedPaginationMode {
82    /// Standard room pagination (for all events as for an unthreaded/main room
83    /// linked chunk).
84    Room { hide_thread_events: bool },
85
86    /// Threaded pagination (the focused event is part of a thread).
87    Thread {
88        /// The root event ID of the thread.
89        thread_root: OwnedEventId,
90    },
91}
92
93struct EventFocusedCacheInner {
94    /// The room owning this event-focused cache.
95    room: WeakRoom,
96
97    /// The focused event ID.
98    focused_event_id: OwnedEventId,
99
100    /// The pagination mode (room or thread).
101    pagination_mode: EventFocusedPaginationMode,
102
103    /// The linked chunk for this event-focused cache.
104    chunk: EventLinkedChunk,
105
106    /// A sender of timeline updates.
107    sender: Sender<TimelineVectorDiffs>,
108
109    /// A sender for globally observable linked chunk updates.
110    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
111}
112
113impl EventFocusedCacheInner {
114    /// Initialize the cache from a focused event.
115    ///
116    /// This uses `/context` to fetch the event with surrounding context.
117    ///
118    /// This detects if the event is part of a thread and sets up the
119    /// appropriate pagination mode.
120    ///
121    /// Pagination tokens are stored as gaps in the linked chunk:
122    /// - Backward token (start): Gap at the front of the linked chunk.
123    /// - Forward token (end): Gap at the back of the linked chunk.
124    #[instrument(skip(self, room), fields(room_id = %self.room.room_id(), event_id = %self.focused_event_id))]
125    async fn start_from(
126        &mut self,
127        room: Room,
128        num_context_events: u16,
129        thread_mode: EventFocusThreadMode,
130    ) -> Result<StartFromResult> {
131        trace!(num_context_events, "fetching event with context via /context");
132
133        let paginator = Paginator::new(room);
134
135        let result =
136            paginator.start_from(&self.focused_event_id, UInt::from(num_context_events)).await?;
137
138        // Detect if the focused event is part of a thread.
139        let thread_root = match thread_mode {
140            EventFocusThreadMode::ForceThread => {
141                // Try to extract a thread root from the focused event, first.
142                let focused_event = result
143                    .events
144                    .iter()
145                    .find(|event| event.event_id().as_ref() == Some(&self.focused_event_id));
146
147                // If the focused event has a thread root, use it.
148                let mut thread_root =
149                    focused_event.and_then(|event| extract_thread_root(event.raw()));
150
151                // If there's no thread root, consider that the focused event itself is the
152                // thread root.
153                if thread_root.is_none() {
154                    thread_root = Some(self.focused_event_id.clone());
155                }
156
157                trace!("force thread mode enabled, treating focused event as thread root");
158                thread_root
159            }
160
161            EventFocusThreadMode::Automatic => {
162                trace!(
163                    "automatic thread mode enabled, checking if focused event is part of a thread"
164                );
165                result
166                    .events
167                    .iter()
168                    .find(|event| event.event_id().as_ref() == Some(&self.focused_event_id))
169                    .and_then(|event| extract_thread_root(event.raw()))
170            }
171        };
172
173        // Get pagination tokens from the paginator.
174        let tokens = paginator.tokens();
175
176        if let Some(root_id) = thread_root {
177            trace!(thread_root = %root_id, "focused event is part of a thread, setting up thread pagination");
178
179            // Check if the thread root is included in the response. Start from the
180            // beginning, since it's more likely to be around there, in that
181            // case.
182            let includes_root =
183                result.events.iter().any(|event| event.event_id().as_ref() == Some(&root_id));
184
185            self.pagination_mode =
186                EventFocusedPaginationMode::Thread { thread_root: root_id.clone() };
187
188            // Filter events to only include those in the thread.
189            let thread_events = result
190                .events
191                .iter()
192                .filter(|event| {
193                    extract_thread_root(event.raw()).as_ref() == Some(&root_id)
194                        || event.event_id().as_ref() == Some(&root_id)
195                })
196                .cloned()
197                .collect();
198
199            // Determine backward token (only if we don't have the thread root).
200            let backward_token = if includes_root {
201                // We have the root, no need for backward pagination.
202                None
203            } else {
204                tokens.previous.into_token()
205            };
206
207            // Forward token.
208            let forward_token = tokens.next.into_token();
209
210            self.add_initial_events_with_gaps(thread_events, backward_token, forward_token);
211        } else {
212            trace!("focused event is not part of a thread, setting up room pagination");
213
214            let backward_token = tokens.previous.into_token();
215            let forward_token = tokens.next.into_token();
216
217            let hide_thread_events =
218                matches!(thread_mode, EventFocusThreadMode::Automatic) && thread_root.is_none();
219
220            self.pagination_mode = EventFocusedPaginationMode::Room { hide_thread_events };
221
222            let events = if hide_thread_events {
223                result
224                    .events
225                    .iter()
226                    .filter(|event| extract_thread_root(event.raw()).is_none())
227                    .cloned()
228                    .collect()
229            } else {
230                result.events.clone()
231            };
232
233            self.add_initial_events_with_gaps(events, backward_token, forward_token);
234        }
235
236        self.propagate_changes();
237
238        // Empty the updates_as_vector_diffs(), since it's impossible for an observer to
239        // have subscribed to this cache yet, since this code is part of the constructor
240        // flow.
241        //
242        // If we didn't empty those, such initial updates would be duplicated, since the
243        // subscriber would get the full initial list of events as diffs and as a set of
244        // initial events.
245        let _ = self.chunk.updates_as_vector_diffs();
246
247        Ok(result)
248    }
249
250    /// Add initial events to the chunk, with gaps for pagination tokens.
251    fn add_initial_events_with_gaps(
252        &mut self,
253        events: Vec<TimelineEvent>,
254        prev_gap_token: Option<String>,
255        next_gap_token: Option<String>,
256    ) {
257        // Insert backward gap at the back if we have a token, and the events
258        // themselves.
259        self.chunk
260            .push_live_events(prev_gap_token.map(|prev_token| Gap { token: prev_token }), &events);
261
262        // Insert forward gap at back if we have a token.
263        if let Some(next_token) = next_gap_token {
264            trace!("inserting forward pagination gap at back");
265            self.chunk.push_gap(Gap { token: next_token });
266        }
267    }
268
269    /// Propagate changes to the linked chunk update sender.
270    fn propagate_changes(&mut self) {
271        let updates = self.chunk.store_updates().take();
272        if !updates.is_empty() {
273            let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
274                updates,
275                linked_chunk_id: OwnedLinkedChunkId::EventFocused(
276                    self.room.room_id().to_owned(),
277                    self.focused_event_id.clone(),
278                ),
279            });
280        }
281    }
282
283    /// Notify subscribers of timeline updates.
284    fn notify_subscribers(&mut self, origin: EventsOrigin) {
285        let diffs = self.chunk.updates_as_vector_diffs();
286        if !diffs.is_empty() {
287            let _ = self.sender.send(TimelineVectorDiffs { diffs, origin });
288        }
289    }
290
291    /// Return the first chunk as a gap, if it's one.
292    fn first_chunk_as_gap(&self) -> Option<(ChunkIdentifier, Gap)> {
293        self.chunk.first_chunk_as_gap()
294    }
295
296    /// Return the last chunk as a gap, if it's one.
297    fn last_chunk_as_gap(&self) -> Option<(ChunkIdentifier, Gap)> {
298        self.chunk.last_chunk_as_gap()
299    }
300
301    /// Paginate backwards in this event-focused linked chunk.
302    ///
303    /// This finds the gap at the front of the linked chunk, fetches older
304    /// events, replaces the gap with the events, and inserts a new gap if
305    /// there are more events to fetch.
306    #[instrument(skip(self), fields(room_id = %self.room.room_id()))]
307    async fn paginate_backwards(&mut self, num_events: u16) -> Result<PaginationResult> {
308        let room = self.room.get().ok_or(EventCacheError::ClientDropped)?;
309
310        // Find the gap at the front (backward pagination token).
311        let Some((gap_id, gap)) = self.first_chunk_as_gap() else {
312            // No gap at front means we've already hit the start of the timeline.
313            trace!("no front gap found, already at timeline start");
314            return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true });
315        };
316
317        let token = gap.token;
318        trace!(?token, "paginating backwards with token from front gap");
319
320        // Fetch events based on pagination mode.
321        let (mut events, new_token) = match &self.pagination_mode {
322            EventFocusedPaginationMode::Room { .. } => {
323                Self::fetch_room_backwards(&room, num_events, &token).await?
324            }
325            EventFocusedPaginationMode::Thread { thread_root } => {
326                Self::fetch_thread_backwards(&room, num_events, &token, thread_root.clone()).await?
327            }
328        };
329
330        // Events are in the reverse order, per the API contracts defined in the two
331        // fetch methods.
332        events.reverse();
333
334        let hit_end = new_token.is_none();
335        let new_gap = new_token.map(|t| Gap { token: t });
336
337        let hide_thread_events = match &self.pagination_mode {
338            EventFocusedPaginationMode::Room { hide_thread_events } => *hide_thread_events,
339            EventFocusedPaginationMode::Thread { .. } => false,
340        };
341
342        let events = if hide_thread_events {
343            events.into_iter().filter(|event| extract_thread_root(event.raw()).is_none()).collect()
344        } else {
345            events
346        };
347
348        // Replace the gap and insert the new events.
349        self.chunk.push_backwards_pagination_events(Some(gap_id), new_gap, &events);
350
351        self.propagate_changes();
352        self.notify_subscribers(EventsOrigin::Pagination);
353
354        Ok(PaginationResult { events, hit_end_of_timeline: hit_end })
355    }
356
357    /// Fetch events for backward room pagination (returns events and optional
358    /// next token).
359    ///
360    /// Returns the events in the same ordering as the one received by the
361    /// server, i.e., newest to oldest.
362    async fn fetch_room_backwards(
363        room: &Room,
364        num_events: u16,
365        token: &str,
366    ) -> Result<(Vec<Event>, Option<String>)> {
367        let mut options = MessagesOptions::backward().from(token);
368        options.limit = UInt::from(num_events);
369
370        let messages = room
371            .messages(options)
372            .await
373            .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
374
375        Ok((messages.chunk, messages.end))
376    }
377
378    /// Fetch events for backward thread pagination.
379    ///
380    /// Returns the events in the same ordering as the one received by the
381    /// server, i.e., newest to oldest.
382    async fn fetch_thread_backwards(
383        room: &Room,
384        num_events: u16,
385        token: &str,
386        thread_root: OwnedEventId,
387    ) -> Result<(Vec<Event>, Option<String>)> {
388        let options = RelationsOptions {
389            from: Some(token.to_owned()),
390            dir: Direction::Backward,
391            limit: Some(UInt::from(num_events)),
392            include_relations: IncludeRelations::AllRelations,
393            recurse: true,
394        };
395
396        let mut result = room
397            .relations(thread_root.clone(), options)
398            .await
399            .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
400
401        // If we hit the end (no more token), load the thread root event.
402        if result.next_batch_token.is_none() {
403            let root_event = room
404                .load_event(&thread_root)
405                .await
406                .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
407            result.chunk.push(root_event);
408        }
409
410        Ok((result.chunk, result.next_batch_token))
411    }
412
413    /// Paginate forwards in this event-focused timeline.
414    ///
415    /// This finds the gap at the back of the linked chunk, fetches newer
416    /// events, replaces the gap with the events, and inserts a new gap if
417    /// there are more events to fetch.
418    #[instrument(skip(self), fields(room_id = %self.room.room_id()))]
419    async fn paginate_forwards(&mut self, num_events: u16) -> Result<PaginationResult> {
420        let room = self.room.get().ok_or(EventCacheError::ClientDropped)?;
421
422        // Find the gap at the back (forward pagination token).
423        let Some((gap_id, gap)) = self.last_chunk_as_gap() else {
424            // No gap at back means we've already hit the end of the timeline.
425            trace!("no back gap found, already at timeline end");
426            return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true });
427        };
428
429        let token = gap.token;
430        trace!(?token, "paginating forwards with token from back gap");
431
432        // Fetch events based on pagination mode.
433        let (events, new_token) = match &self.pagination_mode {
434            EventFocusedPaginationMode::Room { .. } => {
435                Self::fetch_room_forwards(&room, num_events, &token).await?
436            }
437            EventFocusedPaginationMode::Thread { thread_root } => {
438                Self::fetch_thread_forwards(&room, num_events, &token, thread_root.clone()).await?
439            }
440        };
441
442        let hit_end = new_token.is_none();
443        let new_gap = new_token.map(|t| Gap { token: t });
444
445        let hide_thread_events = match &self.pagination_mode {
446            EventFocusedPaginationMode::Room { hide_thread_events } => *hide_thread_events,
447            EventFocusedPaginationMode::Thread { .. } => false,
448        };
449
450        let events = if hide_thread_events {
451            events.into_iter().filter(|event| extract_thread_root(event.raw()).is_none()).collect()
452        } else {
453            events
454        };
455
456        // Replace the gap and insert new events.
457        self.chunk.push_forwards_pagination_events(Some(gap_id), new_gap, &events);
458
459        self.propagate_changes();
460        self.notify_subscribers(EventsOrigin::Pagination);
461
462        Ok(PaginationResult { events, hit_end_of_timeline: hit_end })
463    }
464
465    /// Fetch events for forward room pagination.
466    async fn fetch_room_forwards(
467        room: &Room,
468        num_events: u16,
469        token: &str,
470    ) -> Result<(Vec<Event>, Option<String>)> {
471        let mut options = MessagesOptions::new(Direction::Forward);
472        options = options.from(Some(token));
473        options.limit = UInt::from(num_events);
474
475        let messages = room
476            .messages(options)
477            .await
478            .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
479
480        Ok((messages.chunk, messages.end))
481    }
482
483    /// Fetch events for forward thread pagination.
484    async fn fetch_thread_forwards(
485        room: &Room,
486        num_events: u16,
487        token: &str,
488        thread_root: OwnedEventId,
489    ) -> Result<(Vec<Event>, Option<String>)> {
490        let options = RelationsOptions {
491            from: Some(token.to_owned()),
492            dir: Direction::Forward,
493            limit: Some(UInt::from(num_events)),
494            include_relations: IncludeRelations::AllRelations,
495            recurse: true,
496        };
497
498        let result = room
499            .relations(thread_root, options)
500            .await
501            .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
502
503        Ok((result.chunk, result.next_batch_token))
504    }
505}
506
507/// A cache for an event-focused timeline.
508///
509/// This represents a timeline centered around a specific event (e.g., from a
510/// permalink), supporting both forward and backward pagination. The focused
511/// event may be part of a thread, in which case pagination will use the
512/// `/relations` API instead of `/messages`.
513///
514/// Pagination tokens are stored as Gap items in the linked chunk itself:
515/// - A gap at the **front** (first position) contains the backward pagination
516///   token.
517/// - A gap at the **back** (last position) contains the forward pagination
518///   token.
519///
520/// This is a shallow data structure, and can be cloned cheaply.
521#[derive(Clone)]
522pub struct EventFocusedCache {
523    inner: Arc<RwLock<EventFocusedCacheInner>>,
524}
525
526impl EventFocusedCache {
527    /// Create a new empty event-focused cache.
528    pub(super) fn new(
529        room: WeakRoom,
530        focused_event_id: OwnedEventId,
531        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
532    ) -> Self {
533        Self {
534            inner: Arc::new(RwLock::new(EventFocusedCacheInner {
535                room,
536                focused_event_id,
537                pagination_mode: EventFocusedPaginationMode::Room { hide_thread_events: false },
538                chunk: EventLinkedChunk::new(),
539                sender: Sender::new(32),
540                linked_chunk_update_sender,
541            })),
542        }
543    }
544
545    /// Subscribe to updates from this event-focused timeline.
546    pub async fn subscribe(&self) -> (Vec<Event>, Receiver<TimelineVectorDiffs>) {
547        let inner = self.inner.read().await;
548        let events = inner.chunk.events().map(|(_position, item)| item.clone()).collect();
549        let recv = inner.sender.subscribe();
550        (events, recv)
551    }
552
553    /// Check if we've hit the start of the timeline (no more backward
554    /// pagination possible).
555    pub async fn hit_timeline_start(&self) -> bool {
556        self.inner.read().await.first_chunk_as_gap().is_none()
557    }
558
559    /// Check if we've hit the end of the timeline (no more forward pagination
560    /// possible).
561    pub async fn hit_timeline_end(&self) -> bool {
562        self.inner.read().await.last_chunk_as_gap().is_none()
563    }
564
565    /// Start the event-focused timeline from the focused event, fetching
566    /// context events and detecting thread membership.
567    pub(super) async fn start_from(
568        &self,
569        room: Room,
570        num_context_events: u16,
571        thread_mode: EventFocusThreadMode,
572    ) -> Result<StartFromResult> {
573        self.inner.write().await.start_from(room, num_context_events, thread_mode).await
574    }
575
576    /// Paginate backwards in this event-focused timeline, be it room or thread
577    /// pagination depending on the mode.
578    pub async fn paginate_backwards(&self, num_events: u16) -> Result<PaginationResult> {
579        self.inner.write().await.paginate_backwards(num_events).await
580    }
581
582    /// Paginate forwards in this event-focused timeline, be it room or thread
583    /// pagination depending on the mode.
584    pub async fn paginate_forwards(&self, num_events: u16) -> Result<PaginationResult> {
585        self.inner.write().await.paginate_forwards(num_events).await
586    }
587
588    /// Get the thread root event ID if this linked chunk is in thread mode.
589    pub async fn thread_root(&self) -> Option<OwnedEventId> {
590        match &self.inner.read().await.pagination_mode {
591            EventFocusedPaginationMode::Thread { thread_root } => Some(thread_root.clone()),
592            _ => None,
593        }
594    }
595
596    /// Try to locate the events in the linked chunk corresponding to the given
597    /// list of decrypted events, and replace them, while alerting observers
598    /// about the update.
599    #[cfg(feature = "e2e-encryption")]
600    pub async fn replace_utds(&self, events: &[ResolvedUtd]) {
601        let mut guard = self.inner.write().await;
602        if guard.chunk.replace_utds(events) {
603            guard.propagate_changes();
604            guard.notify_subscribers(EventsOrigin::Cache);
605        }
606    }
607}
608
609#[cfg(not(tarpaulin_include))]
610impl std::fmt::Debug for EventFocusedCache {
611    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
612        f.debug_struct("EventFocusedCache").finish_non_exhaustive()
613    }
614}