Skip to main content

matrix_sdk/event_cache/caches/
pagination.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The logic to paginate a cache (room, thread…) over the disk or the network.
16
17use std::time::Duration;
18
19use eyeball::SharedObservable;
20use eyeball_im::VectorDiff;
21use matrix_sdk_base::{event_cache::Event, timeout::timeout};
22use tracing::{debug, instrument, trace};
23
24use super::super::{EventCacheError, Result};
25
26/// Type to run paginations.
27#[derive(Clone, Debug)]
28pub(in super::super) struct Pagination<C> {
29    pub cache: C,
30}
31
32impl<C> Pagination<C> {
33    /// Create a new [`Pagination`].
34    pub fn new(cache: C) -> Self {
35        Self { cache }
36    }
37}
38
39impl<C> Pagination<C>
40where
41    C: PaginatedCache,
42{
43    /// Starts a back-pagination for the requested number of events.
44    ///
45    /// This automatically takes care of waiting for a pagination token from
46    /// sync, if we haven't done that before.
47    ///
48    /// It will run multiple back-paginations until one of these two conditions
49    /// is met:
50    /// - either we've reached the start of the timeline,
51    /// - or we've obtained enough events to fulfill the requested number of
52    ///   events.
53    #[instrument(skip(self))]
54    pub async fn run_backwards_until(
55        &self,
56        num_requested_events: u16,
57    ) -> Result<BackPaginationOutcome> {
58        let mut events = Vec::new();
59
60        loop {
61            if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
62                events.extend(outcome.events);
63
64                if outcome.reached_start || events.len() >= num_requested_events as usize {
65                    return Ok(BackPaginationOutcome {
66                        reached_start: outcome.reached_start,
67                        events,
68                    });
69                }
70
71                trace!(
72                    "restarting back-pagination, because we haven't reached \
73                     the start or obtained enough events yet"
74                );
75            }
76
77            debug!("restarting back-pagination because of a timeline reset.");
78        }
79    }
80
81    /// Run a single back-pagination for the requested number of events.
82    ///
83    /// This automatically takes care of waiting for a pagination token from
84    /// sync, if we haven't done that before.
85    #[instrument(skip(self))]
86    pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
87        loop {
88            if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
89                return Ok(outcome);
90            }
91
92            debug!("restarting back-pagination because of a timeline reset");
93        }
94    }
95
96    /// Paginate from either the storage or the network, and let pagination
97    /// status observers know about updates.
98    ///
99    /// Returns `Ok(None)` if the pagination token used during a network
100    /// pagination has disappeared from the in-memory linked chunk after
101    /// handling the response.
102    async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
103        // There is at least one gap that must be resolved; reach the network.
104        // First, ensure there's no other ongoing back-pagination.
105        let status_observable = self.cache.status();
106
107        let prev_status = status_observable.set(PaginationStatus::Paginating);
108
109        if !matches!(prev_status, PaginationStatus::Idle { .. }) {
110            return Err(EventCacheError::AlreadyBackpaginating);
111        }
112
113        let reset_status_on_drop_guard = ResetStatusOnDrop {
114            prev_status: Some(prev_status),
115            pagination_status: status_observable.clone(),
116        };
117
118        match self.paginate_backwards_impl(batch_size).await? {
119            Some(outcome) => {
120                // Back-pagination's over and successful, don't reset the status to the previous
121                // value.
122                reset_status_on_drop_guard.disarm();
123
124                // Notify subscribers that pagination ended.
125                status_observable
126                    .set(PaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
127
128                Ok(Some(outcome))
129            }
130
131            None => Ok(None),
132        }
133    }
134
135    /// Paginate from either the storage or the network.
136    ///
137    /// This method isn't concerned with setting the pagination status; only the
138    /// caller is.
139    ///
140    /// Returns `Ok(None)` if the pagination token used during a network
141    /// pagination has disappeared from the in-memory linked chunk after
142    /// handling the response.
143    async fn paginate_backwards_impl(
144        &self,
145        batch_size: u16,
146    ) -> Result<Option<BackPaginationOutcome>> {
147        // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
148        // to load from disk/storage first, then from network if disk/storage indicated
149        // there's no previous events chunk to load.
150
151        loop {
152            match self.cache.load_more_events_backwards().await? {
153                LoadMoreEventsBackwardsOutcome::Gap {
154                    prev_token,
155                    waited_for_initial_prev_token,
156                } => {
157                    if prev_token.is_none() && !waited_for_initial_prev_token {
158                        // We didn't reload a pagination token, and we haven't waited for one; wait
159                        // and start over.
160
161                        const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
162
163                        // Otherwise, wait for a notification that we received a previous-batch
164                        // token.
165                        trace!("waiting for a pagination token…");
166
167                        let _ = timeout(
168                            self.cache.wait_for_prev_token(),
169                            DEFAULT_WAIT_FOR_TOKEN_DURATION,
170                        )
171                        .await;
172
173                        trace!("done waiting");
174
175                        self.cache.mark_has_waited_for_initial_prev_token().await?;
176
177                        // Retry!
178                        //
179                        // Note: the next call to `load_more_events_backwards` should not return
180                        // `WaitForInitialPrevToken` because we've just marked we've waited for the
181                        // initial `prev_token`, so this is not an infinite loop.
182                        //
183                        // Note 2: not a recursive call, because recursive and async have a bad time
184                        // together.
185                        continue;
186                    }
187
188                    // We have a gap, so resolve it with a network back-pagination.
189                    return self.paginate_backwards_with_network(batch_size, prev_token).await;
190                }
191
192                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
193                    return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
194                }
195
196                LoadMoreEventsBackwardsOutcome::Events {
197                    events,
198                    timeline_event_diffs,
199                    reached_start,
200                } => {
201                    return Ok(Some(
202                        self.cache
203                            .conclude_backwards_pagination_from_disk(
204                                events,
205                                timeline_event_diffs,
206                                reached_start,
207                            )
208                            .await,
209                    ));
210                }
211            }
212        }
213    }
214
215    /// Run a single pagination request to the server.
216    ///
217    /// Returns `Ok(None)` if the pagination token used during the request has
218    /// disappeared from the in-memory linked chunk after handling the
219    /// response.
220    async fn paginate_backwards_with_network(
221        &self,
222        batch_size: u16,
223        prev_token: Option<String>,
224    ) -> Result<Option<BackPaginationOutcome>> {
225        let Some((events, new_token)) =
226            self.cache.paginate_backwards_with_network(batch_size, &prev_token).await?
227        else {
228            // Return an empty default response.
229            return Ok(Some(BackPaginationOutcome {
230                reached_start: false,
231                events: Default::default(),
232            }));
233        };
234
235        self.cache.conclude_backwards_pagination_from_network(events, prev_token, new_token).await
236    }
237}
238
239pub(in super::super) trait PaginatedCache {
240    fn status(&self) -> &SharedObservable<PaginationStatus>;
241
242    async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome>;
243    async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()>;
244    async fn wait_for_prev_token(&self);
245
246    async fn paginate_backwards_with_network(
247        &self,
248        batch_size: u16,
249        prev_token: &Option<String>,
250    ) -> Result<Option<(Vec<Event>, Option<String>)>>;
251
252    async fn conclude_backwards_pagination_from_disk(
253        &self,
254        events: Vec<Event>,
255        timeline_event_diffs: Vec<VectorDiff<Event>>,
256        reached_start: bool,
257    ) -> BackPaginationOutcome;
258
259    async fn conclude_backwards_pagination_from_network(
260        &self,
261        events: Vec<Event>,
262        prev_token: Option<String>,
263        new_token: Option<String>,
264    ) -> Result<Option<BackPaginationOutcome>>;
265}
266
267/// Status for the pagination on a cache.
268#[derive(Debug, PartialEq, Clone, Copy)]
269#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
270pub enum PaginationStatus {
271    /// No pagination is happening right now.
272    Idle {
273        /// Have we hit the start of the timeline, i.e. paginating wouldn't
274        /// have any effect?
275        hit_timeline_start: bool,
276    },
277
278    /// Pagination is already running in the background.
279    Paginating,
280}
281
282/// Small RAII guard to reset the pagination status on drop, if not disarmed in
283/// the meanwhile.
284struct ResetStatusOnDrop {
285    prev_status: Option<PaginationStatus>,
286    pagination_status: SharedObservable<PaginationStatus>,
287}
288
289impl ResetStatusOnDrop {
290    /// Make the RAII guard have no effect.
291    fn disarm(mut self) {
292        self.prev_status = None;
293    }
294}
295
296impl Drop for ResetStatusOnDrop {
297    fn drop(&mut self) {
298        if let Some(status) = self.prev_status.take() {
299            let _ = self.pagination_status.set(status);
300        }
301    }
302}
303
304/// The result of a single back-pagination request.
305#[derive(Debug)]
306pub struct BackPaginationOutcome {
307    /// Did the back-pagination reach the start of the timeline?
308    pub reached_start: bool,
309
310    /// All the events that have been returned in the back-pagination
311    /// request.
312    ///
313    /// Events are presented in reverse order: the first element of the vec,
314    /// if present, is the most "recent" event from the chunk (or
315    /// technically, the last one in the topological ordering).
316    pub events: Vec<Event>,
317}
318
319/// Internal type to represent the output of
320/// [`PaginatedCache::load_more_events_backwards`].
321#[derive(Debug)]
322pub(in super::super) enum LoadMoreEventsBackwardsOutcome {
323    /// A gap has been inserted.
324    Gap {
325        /// The previous batch token to be used as the "end" parameter in the
326        /// back-pagination request.
327        prev_token: Option<String>,
328
329        waited_for_initial_prev_token: bool,
330    },
331
332    /// The start of the timeline has been reached.
333    StartOfTimeline,
334
335    /// Events have been inserted.
336    Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
337}