Skip to main content

matrix_sdk/event_cache/caches/
pagination.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The logic to paginate a cache (room, thread…) over the disk or the network.
16
17use std::{pin::Pin, sync::Arc, time::Duration};
18
19use eyeball::{ObservableWriteGuard, SharedObservable};
20use eyeball_im::VectorDiff;
21use futures_util::{
22    FutureExt as _,
23    future::{Either, Shared, ready},
24};
25use matrix_sdk_base::{
26    SendOutsideWasm, SyncOutsideWasm, event_cache::Event, executor::AbortOnDrop, timeout::timeout,
27};
28use matrix_sdk_common::executor::spawn;
29use tracing::{debug, instrument, trace, warn};
30
31use super::super::Result;
32
33/// Type to run paginations.
34#[derive(Clone, Debug)]
35pub(in super::super) struct Pagination<C: SendOutsideWasm + 'static> {
36    pub cache: C,
37}
38
39impl<C: SendOutsideWasm + 'static> Pagination<C> {
40    /// Create a new [`Pagination`].
41    pub fn new(cache: C) -> Self {
42        Self { cache }
43    }
44}
45
46impl<C> Pagination<C>
47where
48    C: Clone + PaginatedCache + SendOutsideWasm + 'static + SyncOutsideWasm,
49{
50    /// Starts a back-pagination for the requested number of events.
51    ///
52    /// This automatically takes care of waiting for a pagination token from
53    /// sync, if we haven't done that before.
54    ///
55    /// It will run multiple back-paginations until one of these two conditions
56    /// is met:
57    /// - either we've reached the start of the timeline,
58    /// - or we've obtained enough events to fulfill the requested number of
59    ///   events.
60    #[instrument(skip(self))]
61    pub async fn run_backwards_until(
62        &self,
63        num_requested_events: u16,
64    ) -> Result<BackPaginationOutcome> {
65        let mut events = Vec::new();
66
67        loop {
68            if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
69                events.extend(outcome.events);
70
71                if outcome.reached_start || events.len() >= num_requested_events as usize {
72                    return Ok(BackPaginationOutcome {
73                        reached_start: outcome.reached_start,
74                        events,
75                    });
76                }
77
78                trace!(
79                    "restarting back-pagination, because we haven't reached \
80                     the start or obtained enough events yet"
81                );
82            }
83
84            debug!("restarting back-pagination because of a timeline reset.");
85        }
86    }
87
88    /// Run a single back-pagination for the requested number of events.
89    ///
90    /// This automatically takes care of waiting for a pagination token from
91    /// sync, if we haven't done that before.
92    #[instrument(skip(self))]
93    pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
94        loop {
95            if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
96                return Ok(outcome);
97            }
98
99            debug!("restarting back-pagination because of a timeline reset");
100        }
101    }
102
103    /// Paginate from either the storage or the network, and let pagination
104    /// status observers know about updates.
105    ///
106    /// Returns `Ok(None)` if the pagination token used during a network
107    /// pagination has disappeared from the in-memory linked chunk after
108    /// handling the response.
109    // Implementation note: return a future instead of making the function async, so
110    // as to not cause issues because the `cache` field is borrowed across await
111    // points.
112    fn run_backwards_impl(
113        &self,
114        batch_size: u16,
115    ) -> impl Future<Output = Result<Option<BackPaginationOutcome>>> {
116        // There is at least one gap that must be resolved; reach the network.
117        // First, ensure there's no other ongoing back-pagination.
118        let status_observable = self.cache.status();
119
120        let mut status_guard = status_observable.write();
121
122        match &*status_guard {
123            SharedPaginationStatus::Idle { hit_timeline_start } => {
124                if *hit_timeline_start {
125                    // Force an extra notification for observers.
126                    ObservableWriteGuard::set(
127                        &mut status_guard,
128                        SharedPaginationStatus::Idle { hit_timeline_start: true },
129                    );
130
131                    return Either::Left(ready(Ok(Some(BackPaginationOutcome {
132                        reached_start: true,
133                        events: Vec::new(),
134                    }))));
135                }
136            }
137
138            SharedPaginationStatus::Paginating { shared_task: shared } => {
139                // There was already a back-pagination request in progress; wait for it to
140                // finish and return its result.
141                let shared = shared.clone();
142                drop(status_guard);
143                return Either::Right(shared.fut.clone());
144            }
145        }
146
147        let reset_status_on_drop_guard = ResetStatusOnDrop {
148            prev_status: Some(status_guard.clone()),
149            pagination_status: status_observable.clone(),
150        };
151
152        let this = self.clone();
153
154        let fut: Pin<Box<dyn SharedPaginationFuture>> = Box::pin(async move {
155            match this.paginate_backwards_impl(batch_size).await? {
156                Some(outcome) => {
157                    // Back-pagination's over and successful, don't reset the status to the previous
158                    // value.
159                    reset_status_on_drop_guard.disarm();
160
161                    // Notify subscribers that pagination ended.
162                    this.cache.status().set(SharedPaginationStatus::Idle {
163                        hit_timeline_start: outcome.reached_start,
164                    });
165
166                    Ok(Some(outcome))
167                }
168
169                None => Ok(None),
170            }
171        });
172
173        let shared_task = fut.shared();
174
175        // Start polling in the background, in a spawned task.
176        let shared_task_clone = shared_task.clone();
177        let join_handle = spawn(async move {
178            if let Err(err) = shared_task_clone.await {
179                warn!("event cache back-pagination failed: {err}");
180            }
181        });
182
183        ObservableWriteGuard::set(
184            &mut status_guard,
185            SharedPaginationStatus::Paginating {
186                shared_task: SharedPaginationTask {
187                    fut: shared_task.clone(),
188                    _join_handle: Arc::new(AbortOnDrop::new(join_handle)),
189                },
190            },
191        );
192
193        // Release the shared lock before waiting for the task to complete.
194        drop(status_guard);
195
196        Either::Right(shared_task)
197    }
198
199    /// Paginate from either the storage or the network.
200    ///
201    /// This method isn't concerned with setting the pagination status; only the
202    /// caller is.
203    ///
204    /// Returns `Ok(None)` if the pagination token used during a network
205    /// pagination has disappeared from the in-memory linked chunk after
206    /// handling the response.
207    async fn paginate_backwards_impl(
208        &self,
209        batch_size: u16,
210    ) -> Result<Option<BackPaginationOutcome>> {
211        // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
212        // to load from disk/storage first, then from network if disk/storage indicated
213        // there's no previous events chunk to load.
214
215        loop {
216            match self.cache.load_more_events_backwards().await? {
217                LoadMoreEventsBackwardsOutcome::Gap {
218                    prev_token,
219                    waited_for_initial_prev_token,
220                } => {
221                    if prev_token.is_none() && !waited_for_initial_prev_token {
222                        // We didn't reload a pagination token, and we haven't waited for one; wait
223                        // and start over.
224
225                        const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
226
227                        // Otherwise, wait for a notification that we received a previous-batch
228                        // token.
229                        trace!("waiting for a pagination token…");
230
231                        let _ = timeout(
232                            self.cache.wait_for_prev_token(),
233                            DEFAULT_WAIT_FOR_TOKEN_DURATION,
234                        )
235                        .await;
236
237                        trace!("done waiting");
238
239                        self.cache.mark_has_waited_for_initial_prev_token().await?;
240
241                        // Retry!
242                        //
243                        // Note: the next call to `load_more_events_backwards` should not return
244                        // `WaitForInitialPrevToken` because we've just marked we've waited for the
245                        // initial `prev_token`, so this is not an infinite loop.
246                        //
247                        // Note 2: not a recursive call, because recursive and async have a bad time
248                        // together.
249                        continue;
250                    }
251
252                    // We have a gap, so resolve it with a network back-pagination.
253                    return self.paginate_backwards_with_network(batch_size, prev_token).await;
254                }
255
256                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
257                    return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
258                }
259
260                LoadMoreEventsBackwardsOutcome::Events {
261                    events,
262                    timeline_event_diffs,
263                    reached_start,
264                } => {
265                    return Ok(Some(
266                        self.cache
267                            .conclude_backwards_pagination_from_disk(
268                                events,
269                                timeline_event_diffs,
270                                reached_start,
271                            )
272                            .await,
273                    ));
274                }
275            }
276        }
277    }
278
279    /// Run a single pagination request to the server.
280    ///
281    /// Returns `Ok(None)` if the pagination token used during the request has
282    /// disappeared from the in-memory linked chunk after handling the
283    /// response.
284    async fn paginate_backwards_with_network(
285        &self,
286        batch_size: u16,
287        prev_token: Option<String>,
288    ) -> Result<Option<BackPaginationOutcome>> {
289        let Some((events, new_token)) =
290            self.cache.paginate_backwards_with_network(batch_size, &prev_token).await?
291        else {
292            // Return an empty default response.
293            return Ok(Some(BackPaginationOutcome {
294                reached_start: false,
295                events: Default::default(),
296            }));
297        };
298
299        self.cache.conclude_backwards_pagination_from_network(events, prev_token, new_token).await
300    }
301}
302
303trait SharedPaginationFuture:
304    Future<Output = Result<Option<BackPaginationOutcome>>> + SendOutsideWasm
305{
306}
307
308impl<T: Future<Output = Result<Option<BackPaginationOutcome>>> + SendOutsideWasm>
309    SharedPaginationFuture for T
310{
311}
312
313/// State for having a pagination run in the background, and be awaited upon by
314/// several tasks.
315///
316/// Such a pagination may be started automatically or manually. It's possible
317/// for a manual caller to wait upon its completion, by awaiting the underlying
318/// shared future.
319#[derive(Clone)]
320pub(in super::super) struct SharedPaginationTask {
321    /// The shared future for a pagination request running in the background, so
322    /// that multiple callers can await it.
323    fut: Shared<Pin<Box<dyn SharedPaginationFuture>>>,
324
325    /// The owned task that started the above future.
326    _join_handle: Arc<AbortOnDrop<()>>,
327}
328
329#[derive(Clone)]
330pub(in super::super) enum SharedPaginationStatus {
331    /// No pagination is happening right now.
332    Idle {
333        /// Have we hit the start of the timeline, i.e. paginating wouldn't
334        /// have any effect?
335        hit_timeline_start: bool,
336    },
337
338    /// Pagination is already running in the background.
339    Paginating { shared_task: SharedPaginationTask },
340}
341
342pub(in super::super) trait PaginatedCache {
343    fn status(&self) -> &SharedObservable<SharedPaginationStatus>;
344
345    fn load_more_events_backwards(
346        &self,
347    ) -> impl Future<Output = Result<LoadMoreEventsBackwardsOutcome>> + SendOutsideWasm;
348
349    fn mark_has_waited_for_initial_prev_token(
350        &self,
351    ) -> impl Future<Output = Result<()>> + SendOutsideWasm;
352
353    fn wait_for_prev_token(&self) -> impl Future<Output = ()> + SendOutsideWasm;
354
355    fn paginate_backwards_with_network(
356        &self,
357        batch_size: u16,
358        prev_token: &Option<String>,
359    ) -> impl Future<Output = Result<Option<(Vec<Event>, Option<String>)>>> + SendOutsideWasm;
360
361    fn conclude_backwards_pagination_from_disk(
362        &self,
363        events: Vec<Event>,
364        timeline_event_diffs: Vec<VectorDiff<Event>>,
365        reached_start: bool,
366    ) -> impl Future<Output = BackPaginationOutcome> + SendOutsideWasm;
367
368    fn conclude_backwards_pagination_from_network(
369        &self,
370        events: Vec<Event>,
371        prev_token: Option<String>,
372        new_token: Option<String>,
373    ) -> impl Future<Output = Result<Option<BackPaginationOutcome>>> + SendOutsideWasm;
374}
375
376/// Status for the pagination on a cache.
377#[derive(Debug, PartialEq, Clone, Copy)]
378#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
379pub enum PaginationStatus {
380    /// No pagination is happening right now.
381    Idle {
382        /// Have we hit the start of the timeline, i.e. paginating wouldn't
383        /// have any effect?
384        hit_timeline_start: bool,
385    },
386
387    /// Pagination is already running in the background.
388    Paginating,
389}
390
391/// Small RAII guard to reset the pagination status on drop, if not disarmed in
392/// the meanwhile.
393struct ResetStatusOnDrop {
394    prev_status: Option<SharedPaginationStatus>,
395    pagination_status: SharedObservable<SharedPaginationStatus>,
396}
397
398impl ResetStatusOnDrop {
399    /// Make the RAII guard have no effect.
400    fn disarm(mut self) {
401        self.prev_status = None;
402    }
403}
404
405impl Drop for ResetStatusOnDrop {
406    fn drop(&mut self) {
407        if let Some(status) = self.prev_status.take() {
408            let _ = self.pagination_status.set(status);
409        }
410    }
411}
412
413/// The result of a single back-pagination request.
414#[derive(Clone, Debug)]
415pub struct BackPaginationOutcome {
416    /// Did the back-pagination reach the start of the timeline?
417    pub reached_start: bool,
418
419    /// All the events that have been returned in the back-pagination
420    /// request.
421    ///
422    /// Events are presented in reverse order: the first element of the vec,
423    /// if present, is the most "recent" event from the chunk (or
424    /// technically, the last one in the topological ordering).
425    pub events: Vec<Event>,
426}
427
428/// Internal type to represent the output of
429/// [`PaginatedCache::load_more_events_backwards`].
430#[derive(Debug)]
431pub(in super::super) enum LoadMoreEventsBackwardsOutcome {
432    /// A gap has been inserted.
433    Gap {
434        /// The previous batch token to be used as the "end" parameter in the
435        /// back-pagination request.
436        prev_token: Option<String>,
437
438        waited_for_initial_prev_token: bool,
439    },
440
441    /// The start of the timeline has been reached.
442    StartOfTimeline,
443
444    /// Events have been inserted.
445    Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
446}