matrix_sdk/event_cache/caches/pagination.rs
1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The logic to paginate a cache (room, thread…) over the disk or the network.
16
17use std::time::Duration;
18
19use eyeball::SharedObservable;
20use eyeball_im::VectorDiff;
21use matrix_sdk_base::{event_cache::Event, timeout::timeout};
22use tracing::{debug, instrument, trace};
23
24use super::super::{EventCacheError, Result};
25
26/// Type to run paginations.
27#[derive(Clone, Debug)]
28pub(in super::super) struct Pagination<C> {
29 pub cache: C,
30}
31
32impl<C> Pagination<C> {
33 /// Create a new [`Pagination`].
34 pub fn new(cache: C) -> Self {
35 Self { cache }
36 }
37}
38
39impl<C> Pagination<C>
40where
41 C: PaginatedCache,
42{
43 /// Starts a back-pagination for the requested number of events.
44 ///
45 /// This automatically takes care of waiting for a pagination token from
46 /// sync, if we haven't done that before.
47 ///
48 /// It will run multiple back-paginations until one of these two conditions
49 /// is met:
50 /// - either we've reached the start of the timeline,
51 /// - or we've obtained enough events to fulfill the requested number of
52 /// events.
53 #[instrument(skip(self))]
54 pub async fn run_backwards_until(
55 &self,
56 num_requested_events: u16,
57 ) -> Result<BackPaginationOutcome> {
58 let mut events = Vec::new();
59
60 loop {
61 if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
62 events.extend(outcome.events);
63
64 if outcome.reached_start || events.len() >= num_requested_events as usize {
65 return Ok(BackPaginationOutcome {
66 reached_start: outcome.reached_start,
67 events,
68 });
69 }
70
71 trace!(
72 "restarting back-pagination, because we haven't reached \
73 the start or obtained enough events yet"
74 );
75 }
76
77 debug!("restarting back-pagination because of a timeline reset.");
78 }
79 }
80
81 /// Run a single back-pagination for the requested number of events.
82 ///
83 /// This automatically takes care of waiting for a pagination token from
84 /// sync, if we haven't done that before.
85 #[instrument(skip(self))]
86 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
87 loop {
88 if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
89 return Ok(outcome);
90 }
91
92 debug!("restarting back-pagination because of a timeline reset");
93 }
94 }
95
96 /// Paginate from either the storage or the network, and let pagination
97 /// status observers know about updates.
98 ///
99 /// Returns `Ok(None)` if the pagination token used during a network
100 /// pagination has disappeared from the in-memory linked chunk after
101 /// handling the response.
102 async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
103 // There is at least one gap that must be resolved; reach the network.
104 // First, ensure there's no other ongoing back-pagination.
105 let status_observable = self.cache.status();
106
107 let prev_status = status_observable.set(PaginationStatus::Paginating);
108
109 if !matches!(prev_status, PaginationStatus::Idle { .. }) {
110 return Err(EventCacheError::AlreadyBackpaginating);
111 }
112
113 let reset_status_on_drop_guard = ResetStatusOnDrop {
114 prev_status: Some(prev_status),
115 pagination_status: status_observable.clone(),
116 };
117
118 match self.paginate_backwards_impl(batch_size).await? {
119 Some(outcome) => {
120 // Back-pagination's over and successful, don't reset the status to the previous
121 // value.
122 reset_status_on_drop_guard.disarm();
123
124 // Notify subscribers that pagination ended.
125 status_observable
126 .set(PaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
127
128 Ok(Some(outcome))
129 }
130
131 None => Ok(None),
132 }
133 }
134
135 /// Paginate from either the storage or the network.
136 ///
137 /// This method isn't concerned with setting the pagination status; only the
138 /// caller is.
139 ///
140 /// Returns `Ok(None)` if the pagination token used during a network
141 /// pagination has disappeared from the in-memory linked chunk after
142 /// handling the response.
143 async fn paginate_backwards_impl(
144 &self,
145 batch_size: u16,
146 ) -> Result<Option<BackPaginationOutcome>> {
147 // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
148 // to load from disk/storage first, then from network if disk/storage indicated
149 // there's no previous events chunk to load.
150
151 loop {
152 match self.cache.load_more_events_backwards().await? {
153 LoadMoreEventsBackwardsOutcome::Gap {
154 prev_token,
155 waited_for_initial_prev_token,
156 } => {
157 if prev_token.is_none() && !waited_for_initial_prev_token {
158 // We didn't reload a pagination token, and we haven't waited for one; wait
159 // and start over.
160
161 const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
162
163 // Otherwise, wait for a notification that we received a previous-batch
164 // token.
165 trace!("waiting for a pagination token…");
166
167 let _ = timeout(
168 self.cache.wait_for_prev_token(),
169 DEFAULT_WAIT_FOR_TOKEN_DURATION,
170 )
171 .await;
172
173 trace!("done waiting");
174
175 self.cache.mark_has_waited_for_initial_prev_token().await?;
176
177 // Retry!
178 //
179 // Note: the next call to `load_more_events_backwards` should not return
180 // `WaitForInitialPrevToken` because we've just marked we've waited for the
181 // initial `prev_token`, so this is not an infinite loop.
182 //
183 // Note 2: not a recursive call, because recursive and async have a bad time
184 // together.
185 continue;
186 }
187
188 // We have a gap, so resolve it with a network back-pagination.
189 return self.paginate_backwards_with_network(batch_size, prev_token).await;
190 }
191
192 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
193 return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
194 }
195
196 LoadMoreEventsBackwardsOutcome::Events {
197 events,
198 timeline_event_diffs,
199 reached_start,
200 } => {
201 return Ok(Some(
202 self.cache
203 .conclude_backwards_pagination_from_disk(
204 events,
205 timeline_event_diffs,
206 reached_start,
207 )
208 .await,
209 ));
210 }
211 }
212 }
213 }
214
215 /// Run a single pagination request to the server.
216 ///
217 /// Returns `Ok(None)` if the pagination token used during the request has
218 /// disappeared from the in-memory linked chunk after handling the
219 /// response.
220 async fn paginate_backwards_with_network(
221 &self,
222 batch_size: u16,
223 prev_token: Option<String>,
224 ) -> Result<Option<BackPaginationOutcome>> {
225 let Some((events, new_token)) =
226 self.cache.paginate_backwards_with_network(batch_size, &prev_token).await?
227 else {
228 // Return an empty default response.
229 return Ok(Some(BackPaginationOutcome {
230 reached_start: false,
231 events: Default::default(),
232 }));
233 };
234
235 self.cache.conclude_backwards_pagination_from_network(events, prev_token, new_token).await
236 }
237}
238
239pub(in super::super) trait PaginatedCache {
240 fn status(&self) -> &SharedObservable<PaginationStatus>;
241
242 async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome>;
243 async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()>;
244 async fn wait_for_prev_token(&self);
245
246 async fn paginate_backwards_with_network(
247 &self,
248 batch_size: u16,
249 prev_token: &Option<String>,
250 ) -> Result<Option<(Vec<Event>, Option<String>)>>;
251
252 async fn conclude_backwards_pagination_from_disk(
253 &self,
254 events: Vec<Event>,
255 timeline_event_diffs: Vec<VectorDiff<Event>>,
256 reached_start: bool,
257 ) -> BackPaginationOutcome;
258
259 async fn conclude_backwards_pagination_from_network(
260 &self,
261 events: Vec<Event>,
262 prev_token: Option<String>,
263 new_token: Option<String>,
264 ) -> Result<Option<BackPaginationOutcome>>;
265}
266
267/// Status for the pagination on a cache.
268#[derive(Debug, PartialEq, Clone, Copy)]
269#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
270pub enum PaginationStatus {
271 /// No pagination is happening right now.
272 Idle {
273 /// Have we hit the start of the timeline, i.e. paginating wouldn't
274 /// have any effect?
275 hit_timeline_start: bool,
276 },
277
278 /// Pagination is already running in the background.
279 Paginating,
280}
281
282/// Small RAII guard to reset the pagination status on drop, if not disarmed in
283/// the meanwhile.
284struct ResetStatusOnDrop {
285 prev_status: Option<PaginationStatus>,
286 pagination_status: SharedObservable<PaginationStatus>,
287}
288
289impl ResetStatusOnDrop {
290 /// Make the RAII guard have no effect.
291 fn disarm(mut self) {
292 self.prev_status = None;
293 }
294}
295
296impl Drop for ResetStatusOnDrop {
297 fn drop(&mut self) {
298 if let Some(status) = self.prev_status.take() {
299 let _ = self.pagination_status.set(status);
300 }
301 }
302}
303
304/// The result of a single back-pagination request.
305#[derive(Debug)]
306pub struct BackPaginationOutcome {
307 /// Did the back-pagination reach the start of the timeline?
308 pub reached_start: bool,
309
310 /// All the events that have been returned in the back-pagination
311 /// request.
312 ///
313 /// Events are presented in reverse order: the first element of the vec,
314 /// if present, is the most "recent" event from the chunk (or
315 /// technically, the last one in the topological ordering).
316 pub events: Vec<Event>,
317}
318
319/// Internal type to represent the output of
320/// [`PaginatedCache::load_more_events_backwards`].
321#[derive(Debug)]
322pub(in super::super) enum LoadMoreEventsBackwardsOutcome {
323 /// A gap has been inserted.
324 Gap {
325 /// The previous batch token to be used as the "end" parameter in the
326 /// back-pagination request.
327 prev_token: Option<String>,
328
329 waited_for_initial_prev_token: bool,
330 },
331
332 /// The start of the timeline has been reached.
333 StartOfTimeline,
334
335 /// Events have been inserted.
336 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
337}