Skip to main content

matrix_sdk/event_cache/caches/thread/
pagination.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use eyeball::SharedObservable;
18use eyeball_im::VectorDiff;
19use matrix_sdk_base::{
20    event_cache::{Event, Gap},
21    linked_chunk::ChunkContent,
22};
23use ruma::api::Direction;
24use tracing::trace;
25
26use super::{
27    super::super::{
28        EventCacheError, EventsOrigin, Result, TimelineVectorDiffs,
29        caches::pagination::{
30            BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
31        },
32    },
33    ThreadEventCacheInner,
34};
35use crate::{
36    event_cache::caches::pagination::SharedPaginationStatus,
37    room::{IncludeRelations, RelationsOptions},
38};
39
40/// Intermediate type because the `ThreadEventCache` state is currently owned by
41/// `RoomEventCache`.
42#[derive(Clone)]
43struct ThreadEventCacheWrapper {
44    cache: Arc<ThreadEventCacheInner>,
45
46    // Threads do not support pagination status for the moment but we need one, so let's use a
47    // dummy one for now.
48    dummy_pagination_status: SharedObservable<SharedPaginationStatus>,
49}
50
51/// An API object to run pagination queries on a `ThreadEventCache`.
52#[allow(missing_debug_implementations)]
53pub struct ThreadPagination(Pagination<ThreadEventCacheWrapper>);
54
55impl ThreadPagination {
56    /// Construct a new [`ThreadPagination`].
57    pub(super) fn new(cache: Arc<ThreadEventCacheInner>) -> Self {
58        Self(Pagination::new(ThreadEventCacheWrapper {
59            cache,
60            dummy_pagination_status: SharedObservable::new(SharedPaginationStatus::Idle {
61                hit_timeline_start: false,
62            }),
63        }))
64    }
65
66    /// Starts a back-pagination for the requested number of events.
67    ///
68    /// This automatically takes care of waiting for a pagination token from
69    /// sync, if we haven't done that before.
70    ///
71    /// It will run multiple back-paginations until one of these two conditions
72    /// is met:
73    /// - either we've reached the start of the timeline,
74    /// - or we've obtained enough events to fulfill the requested number of
75    ///   events.
76    pub async fn run_backwards_until(
77        &self,
78        num_requested_events: u16,
79    ) -> Result<BackPaginationOutcome> {
80        self.0.run_backwards_until(num_requested_events).await
81    }
82
83    /// Run a single back-pagination for the requested number of events.
84    ///
85    /// This automatically takes care of waiting for a pagination token from
86    /// sync, if we haven't done that before.
87    pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
88        self.0.run_backwards_once(batch_size).await
89    }
90}
91
92impl PaginatedCache for ThreadEventCacheWrapper {
93    fn status(&self) -> &SharedObservable<SharedPaginationStatus> {
94        &self.dummy_pagination_status
95    }
96
97    async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome> {
98        let state = self.cache.state.read().await?;
99
100        // If any in-memory chunk is a gap, don't load more events, and let the caller
101        // resolve the gap.
102        if let Some(prev_token) = state.thread_linked_chunk().rgap().map(|gap| gap.token) {
103            trace!(%prev_token, "thread chunk has at least a gap");
104
105            return Ok(LoadMoreEventsBackwardsOutcome::Gap {
106                prev_token: Some(prev_token),
107                waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
108            });
109        }
110
111        // If we don't have a gap, then the first event should be the the thread's root;
112        // otherwise, we'll restart a pagination from the end.
113        if let Some((_pos, event)) = state.thread_linked_chunk().events().next() {
114            let first_event_id =
115                event.event_id().expect("a linked chunk only stores events with IDs");
116
117            if first_event_id == self.cache.thread_id {
118                trace!("thread chunk is fully loaded and non-empty: reached_start=true");
119
120                return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
121            }
122        }
123
124        // Otherwise, we don't have a gap nor events. We don't have anything. Poor us.
125        // Well, is ok: start a pagination from the end.
126        Ok(LoadMoreEventsBackwardsOutcome::Gap {
127            prev_token: None,
128            waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
129        })
130    }
131
132    async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()> {
133        *self.cache.state.write().await?.waited_for_initial_prev_token_mut() = true;
134
135        Ok(())
136    }
137
138    async fn wait_for_prev_token(&self) {
139        // TODO: implement once we have persistent storage
140    }
141
142    async fn paginate_backwards_with_network(
143        &self,
144        batch_size: u16,
145        prev_token: &Option<String>,
146    ) -> Result<Option<(Vec<Event>, Option<String>)>> {
147        let Some(room) = self.cache.weak_room.get() else {
148            // The client is shutting down.
149            return Ok(None);
150        };
151
152        let options = RelationsOptions {
153            from: prev_token.clone(),
154            dir: Direction::Backward,
155            limit: Some(batch_size.into()),
156            include_relations: IncludeRelations::AllRelations,
157            recurse: true,
158        };
159
160        let response = room
161            .relations(self.cache.thread_id.clone(), options)
162            .await
163            .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
164
165        Ok(Some((response.chunk, response.next_batch_token)))
166    }
167
168    async fn conclude_backwards_pagination_from_disk(
169        &self,
170        _events: Vec<Event>,
171        _timeline_event_diffs: Vec<VectorDiff<Event>>,
172        _reached_start: bool,
173    ) -> BackPaginationOutcome {
174        unimplemented!("loading from disk for threads is not implemented yet");
175    }
176
177    async fn conclude_backwards_pagination_from_network(
178        &self,
179        mut events: Vec<Event>,
180        prev_token: Option<String>,
181        new_token: Option<String>,
182    ) -> Result<Option<BackPaginationOutcome>> {
183        let Some(room) = self.cache.weak_room.get() else {
184            // The client is shutting down.
185            return Ok(None);
186        };
187
188        let reached_start = new_token.is_none();
189
190        // Because the state lock is taken again in `load_or_fetch_event`, we need
191        // to do this *before* we take the state lock again.
192        let root_event = if reached_start {
193            // Prepend the thread root event to the results.
194            Some(
195                room.load_or_fetch_event(&self.cache.thread_id, None)
196                    .await
197                    .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?,
198            )
199        } else {
200            None
201        };
202
203        let mut state = self.cache.state.write().await?;
204
205        // Save all the events (but the thread root) in the store.
206        state.save_events(events.iter().cloned()).await?;
207
208        // Note: the events are still in the reversed order at this point, so
209        // pushing will eventually make it so that the root event is the first.
210        events.extend(root_event);
211
212        let prev_gap_id = if let Some(token) = prev_token {
213            // If the gap id is missing, it means that the gap disappeared during
214            // pagination; in this case, early return to the caller.
215            let Some(gap_id) = state.thread_linked_chunk().chunk_identifier(|chunk| {
216                    matches!(chunk.content(), ChunkContent::Gap(Gap { token: prev_token }) if *prev_token == token)
217                }) else {
218                    // The previous token has gone missing, so the timeline has been reset in the
219                    // meanwhile, but it's fine per this function's contract.
220                    return Ok(None);
221                };
222
223            Some(gap_id)
224        } else {
225            None
226        };
227
228        // This is a backwards pagination, so the events were returned in the reverse
229        // topological order.
230        let topo_ordered_events = events.iter().cloned().rev().collect::<Vec<_>>();
231        let new_gap = new_token.map(|token| Gap { token });
232
233        let deduplication = state.filter_duplicate_events(topo_ordered_events);
234
235        let (events, new_gap) = if deduplication.non_empty_all_duplicates {
236            // If all events are duplicates, we don't need to do anything; ignore
237            // the new events and the new gap.
238            (Vec::new(), None)
239        } else {
240            assert!(
241                deduplication.in_store_duplicated_event_ids.is_empty(),
242                "persistent storage for threads is not implemented yet"
243            );
244            state.remove_events(deduplication.in_memory_duplicated_event_ids).await?;
245
246            // Keep events and the gap.
247            (deduplication.all_events, new_gap)
248        };
249
250        // Add the paginated events to the thread chunk.
251        let reached_start = state.thread_linked_chunk_mut().push_backwards_pagination_events(
252            prev_gap_id,
253            new_gap,
254            &events,
255        );
256
257        state.propagate_changes().await?;
258
259        // Notify observers about the updates.
260        let updates = state.thread_linked_chunk_mut().updates_as_vector_diffs();
261
262        if !updates.is_empty() {
263            // Send the updates to the listeners.
264            let _ = state
265                .state
266                .sender
267                .send(TimelineVectorDiffs { diffs: updates, origin: EventsOrigin::Pagination });
268        }
269
270        Ok(Some(BackPaginationOutcome { reached_start, events }))
271    }
272}