matrix_sdk/event_cache/caches/pagination.rs
1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The logic to paginate a cache (room, thread…) over the disk or the network.
16
17use std::{pin::Pin, sync::Arc, time::Duration};
18
19use eyeball::{ObservableWriteGuard, SharedObservable};
20use eyeball_im::VectorDiff;
21use futures_util::{
22 FutureExt as _,
23 future::{Either, Shared, ready},
24};
25use matrix_sdk_base::{
26 SendOutsideWasm, SyncOutsideWasm, event_cache::Event, executor::AbortOnDrop, timeout::timeout,
27};
28use matrix_sdk_common::executor::spawn;
29use tracing::{debug, instrument, trace, warn};
30
31use super::super::Result;
32
33/// Type to run paginations.
34#[derive(Clone, Debug)]
35pub(in super::super) struct Pagination<C: SendOutsideWasm + 'static> {
36 pub cache: C,
37}
38
39impl<C: SendOutsideWasm + 'static> Pagination<C> {
40 /// Create a new [`Pagination`].
41 pub fn new(cache: C) -> Self {
42 Self { cache }
43 }
44}
45
46impl<C> Pagination<C>
47where
48 C: Clone + PaginatedCache + SendOutsideWasm + 'static + SyncOutsideWasm,
49{
50 /// Starts a back-pagination for the requested number of events.
51 ///
52 /// This automatically takes care of waiting for a pagination token from
53 /// sync, if we haven't done that before.
54 ///
55 /// It will run multiple back-paginations until one of these two conditions
56 /// is met:
57 /// - either we've reached the start of the timeline,
58 /// - or we've obtained enough events to fulfill the requested number of
59 /// events.
60 #[instrument(skip(self))]
61 pub async fn run_backwards_until(
62 &self,
63 num_requested_events: u16,
64 ) -> Result<BackPaginationOutcome> {
65 let mut events = Vec::new();
66
67 loop {
68 if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
69 events.extend(outcome.events);
70
71 if outcome.reached_start || events.len() >= num_requested_events as usize {
72 return Ok(BackPaginationOutcome {
73 reached_start: outcome.reached_start,
74 events,
75 });
76 }
77
78 trace!(
79 "restarting back-pagination, because we haven't reached \
80 the start or obtained enough events yet"
81 );
82 }
83
84 debug!("restarting back-pagination because of a timeline reset.");
85 }
86 }
87
88 /// Run a single back-pagination for the requested number of events.
89 ///
90 /// This automatically takes care of waiting for a pagination token from
91 /// sync, if we haven't done that before.
92 #[instrument(skip(self))]
93 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
94 loop {
95 if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
96 return Ok(outcome);
97 }
98
99 debug!("restarting back-pagination because of a timeline reset");
100 }
101 }
102
103 /// Paginate from either the storage or the network, and let pagination
104 /// status observers know about updates.
105 ///
106 /// Returns `Ok(None)` if the pagination token used during a network
107 /// pagination has disappeared from the in-memory linked chunk after
108 /// handling the response.
109 // Implementation note: return a future instead of making the function async, so
110 // as to not cause issues because the `cache` field is borrowed across await
111 // points.
112 fn run_backwards_impl(
113 &self,
114 batch_size: u16,
115 ) -> impl Future<Output = Result<Option<BackPaginationOutcome>>> {
116 // There is at least one gap that must be resolved; reach the network.
117 // First, ensure there's no other ongoing back-pagination.
118 let status_observable = self.cache.status();
119
120 let mut status_guard = status_observable.write();
121
122 match &*status_guard {
123 SharedPaginationStatus::Idle { hit_timeline_start } => {
124 if *hit_timeline_start {
125 // Force an extra notification for observers.
126 ObservableWriteGuard::set(
127 &mut status_guard,
128 SharedPaginationStatus::Idle { hit_timeline_start: true },
129 );
130
131 return Either::Left(ready(Ok(Some(BackPaginationOutcome {
132 reached_start: true,
133 events: Vec::new(),
134 }))));
135 }
136 }
137
138 SharedPaginationStatus::Paginating { shared_task: shared } => {
139 // There was already a back-pagination request in progress; wait for it to
140 // finish and return its result.
141 let shared = shared.clone();
142 drop(status_guard);
143 return Either::Right(shared.fut.clone());
144 }
145 }
146
147 let reset_status_on_drop_guard = ResetStatusOnDrop {
148 prev_status: Some(status_guard.clone()),
149 pagination_status: status_observable.clone(),
150 };
151
152 let this = self.clone();
153
154 let fut: Pin<Box<dyn SharedPaginationFuture>> = Box::pin(async move {
155 match this.paginate_backwards_impl(batch_size).await? {
156 Some(outcome) => {
157 // Back-pagination's over and successful, don't reset the status to the previous
158 // value.
159 reset_status_on_drop_guard.disarm();
160
161 // Notify subscribers that pagination ended.
162 this.cache.status().set(SharedPaginationStatus::Idle {
163 hit_timeline_start: outcome.reached_start,
164 });
165
166 Ok(Some(outcome))
167 }
168
169 None => Ok(None),
170 }
171 });
172
173 let shared_task = fut.shared();
174
175 // Start polling in the background, in a spawned task.
176 let shared_task_clone = shared_task.clone();
177 let join_handle = spawn(async move {
178 if let Err(err) = shared_task_clone.await {
179 warn!("event cache back-pagination failed: {err}");
180 }
181 });
182
183 ObservableWriteGuard::set(
184 &mut status_guard,
185 SharedPaginationStatus::Paginating {
186 shared_task: SharedPaginationTask {
187 fut: shared_task.clone(),
188 _join_handle: Arc::new(AbortOnDrop::new(join_handle)),
189 },
190 },
191 );
192
193 // Release the shared lock before waiting for the task to complete.
194 drop(status_guard);
195
196 Either::Right(shared_task)
197 }
198
199 /// Paginate from either the storage or the network.
200 ///
201 /// This method isn't concerned with setting the pagination status; only the
202 /// caller is.
203 ///
204 /// Returns `Ok(None)` if the pagination token used during a network
205 /// pagination has disappeared from the in-memory linked chunk after
206 /// handling the response.
207 async fn paginate_backwards_impl(
208 &self,
209 batch_size: u16,
210 ) -> Result<Option<BackPaginationOutcome>> {
211 // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
212 // to load from disk/storage first, then from network if disk/storage indicated
213 // there's no previous events chunk to load.
214
215 loop {
216 match self.cache.load_more_events_backwards().await? {
217 LoadMoreEventsBackwardsOutcome::Gap {
218 prev_token,
219 waited_for_initial_prev_token,
220 } => {
221 if prev_token.is_none() && !waited_for_initial_prev_token {
222 // We didn't reload a pagination token, and we haven't waited for one; wait
223 // and start over.
224
225 const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
226
227 // Otherwise, wait for a notification that we received a previous-batch
228 // token.
229 trace!("waiting for a pagination token…");
230
231 let _ = timeout(
232 self.cache.wait_for_prev_token(),
233 DEFAULT_WAIT_FOR_TOKEN_DURATION,
234 )
235 .await;
236
237 trace!("done waiting");
238
239 self.cache.mark_has_waited_for_initial_prev_token().await?;
240
241 // Retry!
242 //
243 // Note: the next call to `load_more_events_backwards` should not return
244 // `WaitForInitialPrevToken` because we've just marked we've waited for the
245 // initial `prev_token`, so this is not an infinite loop.
246 //
247 // Note 2: not a recursive call, because recursive and async have a bad time
248 // together.
249 continue;
250 }
251
252 // We have a gap, so resolve it with a network back-pagination.
253 return self.paginate_backwards_with_network(batch_size, prev_token).await;
254 }
255
256 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
257 return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
258 }
259
260 LoadMoreEventsBackwardsOutcome::Events {
261 events,
262 timeline_event_diffs,
263 reached_start,
264 } => {
265 return Ok(Some(
266 self.cache
267 .conclude_backwards_pagination_from_disk(
268 events,
269 timeline_event_diffs,
270 reached_start,
271 )
272 .await,
273 ));
274 }
275 }
276 }
277 }
278
279 /// Run a single pagination request to the server.
280 ///
281 /// Returns `Ok(None)` if the pagination token used during the request has
282 /// disappeared from the in-memory linked chunk after handling the
283 /// response.
284 async fn paginate_backwards_with_network(
285 &self,
286 batch_size: u16,
287 prev_token: Option<String>,
288 ) -> Result<Option<BackPaginationOutcome>> {
289 let Some((events, new_token)) =
290 self.cache.paginate_backwards_with_network(batch_size, &prev_token).await?
291 else {
292 // Return an empty default response.
293 return Ok(Some(BackPaginationOutcome {
294 reached_start: false,
295 events: Default::default(),
296 }));
297 };
298
299 self.cache.conclude_backwards_pagination_from_network(events, prev_token, new_token).await
300 }
301}
302
303trait SharedPaginationFuture:
304 Future<Output = Result<Option<BackPaginationOutcome>>> + SendOutsideWasm
305{
306}
307
308impl<T: Future<Output = Result<Option<BackPaginationOutcome>>> + SendOutsideWasm>
309 SharedPaginationFuture for T
310{
311}
312
313/// State for having a pagination run in the background, and be awaited upon by
314/// several tasks.
315///
316/// Such a pagination may be started automatically or manually. It's possible
317/// for a manual caller to wait upon its completion, by awaiting the underlying
318/// shared future.
319#[derive(Clone)]
320pub(in super::super) struct SharedPaginationTask {
321 /// The shared future for a pagination request running in the background, so
322 /// that multiple callers can await it.
323 fut: Shared<Pin<Box<dyn SharedPaginationFuture>>>,
324
325 /// The owned task that started the above future.
326 _join_handle: Arc<AbortOnDrop<()>>,
327}
328
329#[derive(Clone)]
330pub(in super::super) enum SharedPaginationStatus {
331 /// No pagination is happening right now.
332 Idle {
333 /// Have we hit the start of the timeline, i.e. paginating wouldn't
334 /// have any effect?
335 hit_timeline_start: bool,
336 },
337
338 /// Pagination is already running in the background.
339 Paginating { shared_task: SharedPaginationTask },
340}
341
342pub(in super::super) trait PaginatedCache {
343 fn status(&self) -> &SharedObservable<SharedPaginationStatus>;
344
345 fn load_more_events_backwards(
346 &self,
347 ) -> impl Future<Output = Result<LoadMoreEventsBackwardsOutcome>> + SendOutsideWasm;
348
349 fn mark_has_waited_for_initial_prev_token(
350 &self,
351 ) -> impl Future<Output = Result<()>> + SendOutsideWasm;
352
353 fn wait_for_prev_token(&self) -> impl Future<Output = ()> + SendOutsideWasm;
354
355 fn paginate_backwards_with_network(
356 &self,
357 batch_size: u16,
358 prev_token: &Option<String>,
359 ) -> impl Future<Output = Result<Option<(Vec<Event>, Option<String>)>>> + SendOutsideWasm;
360
361 fn conclude_backwards_pagination_from_disk(
362 &self,
363 events: Vec<Event>,
364 timeline_event_diffs: Vec<VectorDiff<Event>>,
365 reached_start: bool,
366 ) -> impl Future<Output = BackPaginationOutcome> + SendOutsideWasm;
367
368 fn conclude_backwards_pagination_from_network(
369 &self,
370 events: Vec<Event>,
371 prev_token: Option<String>,
372 new_token: Option<String>,
373 ) -> impl Future<Output = Result<Option<BackPaginationOutcome>>> + SendOutsideWasm;
374}
375
376/// Status for the pagination on a cache.
377#[derive(Debug, PartialEq, Clone, Copy)]
378#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
379pub enum PaginationStatus {
380 /// No pagination is happening right now.
381 Idle {
382 /// Have we hit the start of the timeline, i.e. paginating wouldn't
383 /// have any effect?
384 hit_timeline_start: bool,
385 },
386
387 /// Pagination is already running in the background.
388 Paginating,
389}
390
391/// Small RAII guard to reset the pagination status on drop, if not disarmed in
392/// the meanwhile.
393struct ResetStatusOnDrop {
394 prev_status: Option<SharedPaginationStatus>,
395 pagination_status: SharedObservable<SharedPaginationStatus>,
396}
397
398impl ResetStatusOnDrop {
399 /// Make the RAII guard have no effect.
400 fn disarm(mut self) {
401 self.prev_status = None;
402 }
403}
404
405impl Drop for ResetStatusOnDrop {
406 fn drop(&mut self) {
407 if let Some(status) = self.prev_status.take() {
408 let _ = self.pagination_status.set(status);
409 }
410 }
411}
412
413/// The result of a single back-pagination request.
414#[derive(Clone, Debug)]
415pub struct BackPaginationOutcome {
416 /// Did the back-pagination reach the start of the timeline?
417 pub reached_start: bool,
418
419 /// All the events that have been returned in the back-pagination
420 /// request.
421 ///
422 /// Events are presented in reverse order: the first element of the vec,
423 /// if present, is the most "recent" event from the chunk (or
424 /// technically, the last one in the topological ordering).
425 pub events: Vec<Event>,
426}
427
428/// Internal type to represent the output of
429/// [`PaginatedCache::load_more_events_backwards`].
430#[derive(Debug)]
431pub(in super::super) enum LoadMoreEventsBackwardsOutcome {
432 /// A gap has been inserted.
433 Gap {
434 /// The previous batch token to be used as the "end" parameter in the
435 /// back-pagination request.
436 prev_token: Option<String>,
437
438 waited_for_initial_prev_token: bool,
439 },
440
441 /// The start of the timeline has been reached.
442 StartOfTimeline,
443
444 /// Events have been inserted.
445 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
446}