matrix_sdk/event_cache/pagination.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{
18 sync::{Arc, atomic::Ordering},
19 time::Duration,
20};
21
22use eyeball::{SharedObservable, Subscriber};
23use matrix_sdk_base::timeout::timeout;
24use ruma::api::Direction;
25use tracing::{debug, instrument, trace};
26
27use super::{
28 BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
29 room::{LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
30};
31use crate::{
32 event_cache::{EventCacheError, RoomEventCacheGenericUpdate},
33 room::MessagesOptions,
34};
35
36/// Status for the back-pagination on a room event cache.
37#[derive(Debug, PartialEq, Clone, Copy)]
38#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
39pub enum RoomPaginationStatus {
40 /// No back-pagination is happening right now.
41 Idle {
42 /// Have we hit the start of the timeline, i.e. back-paginating wouldn't
43 /// have any effect?
44 hit_timeline_start: bool,
45 },
46
47 /// Back-pagination is already running in the background.
48 Paginating,
49}
50
51/// Small RAII guard to reset the pagination status on drop, if not disarmed in
52/// the meanwhile.
53struct ResetStatusOnDrop {
54 prev_status: Option<RoomPaginationStatus>,
55 pagination_status: SharedObservable<RoomPaginationStatus>,
56}
57
58impl ResetStatusOnDrop {
59 /// Make the RAII guard have no effect.
60 fn disarm(mut self) {
61 self.prev_status = None;
62 }
63}
64
65impl Drop for ResetStatusOnDrop {
66 fn drop(&mut self) {
67 if let Some(status) = self.prev_status.take() {
68 let _ = self.pagination_status.set(status);
69 }
70 }
71}
72
73/// An API object to run pagination queries on a [`super::RoomEventCache`].
74///
75/// Can be created with [`super::RoomEventCache::pagination()`].
76#[allow(missing_debug_implementations)]
77#[derive(Clone)]
78pub struct RoomPagination {
79 pub(super) inner: Arc<RoomEventCacheInner>,
80}
81
82impl RoomPagination {
83 /// Starts a back-pagination for the requested number of events.
84 ///
85 /// This automatically takes care of waiting for a pagination token from
86 /// sync, if we haven't done that before.
87 ///
88 /// It will run multiple back-paginations until one of these two conditions
89 /// is met:
90 /// - either we've reached the start of the timeline,
91 /// - or we've obtained enough events to fulfill the requested number of
92 /// events.
93 #[instrument(skip(self))]
94 pub async fn run_backwards_until(
95 &self,
96 num_requested_events: u16,
97 ) -> Result<BackPaginationOutcome> {
98 let mut events = Vec::new();
99
100 loop {
101 if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
102 events.extend(outcome.events);
103 if outcome.reached_start || events.len() >= num_requested_events as usize {
104 return Ok(BackPaginationOutcome {
105 reached_start: outcome.reached_start,
106 events,
107 });
108 }
109 trace!(
110 "restarting back-pagination, because we haven't reached \
111 the start or obtained enough events yet"
112 );
113 }
114
115 debug!("restarting back-pagination because of a timeline reset.");
116 }
117 }
118
119 /// Run a single back-pagination for the requested number of events.
120 ///
121 /// This automatically takes care of waiting for a pagination token from
122 /// sync, if we haven't done that before.
123 #[instrument(skip(self))]
124 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
125 loop {
126 if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
127 return Ok(outcome);
128 }
129 debug!("restarting back-pagination because of a timeline reset.");
130 }
131 }
132
133 /// Paginate from either the storage or the network, and let pagination
134 /// status observers know about updates.
135 ///
136 /// Returns `Ok(None)` if the pagination token used during a network
137 /// pagination has disappeared from the in-memory linked chunk after
138 /// handling the response.
139 async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
140 // There is at least one gap that must be resolved; reach the network.
141 // First, ensure there's no other ongoing back-pagination.
142 let status_observable = &self.inner.pagination_status;
143
144 let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
145 if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
146 return Err(EventCacheError::AlreadyBackpaginating);
147 }
148
149 let reset_status_on_drop_guard = ResetStatusOnDrop {
150 prev_status: Some(prev_status),
151 pagination_status: status_observable.clone(),
152 };
153
154 match self.paginate_backwards_impl(batch_size).await? {
155 Some(outcome) => {
156 // Back-pagination's over and successful, don't reset the status to the previous
157 // value.
158 reset_status_on_drop_guard.disarm();
159
160 // Notify subscribers that pagination ended.
161 status_observable
162 .set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
163
164 Ok(Some(outcome))
165 }
166
167 None => Ok(None),
168 }
169 }
170
171 /// Paginate from either the storage or the network.
172 ///
173 /// This method isn't concerned with setting the pagination status; only the
174 /// caller is.
175 ///
176 /// Returns `Ok(None)` if the pagination token used during a network
177 /// pagination has disappeared from the in-memory linked chunk after
178 /// handling the response.
179 async fn paginate_backwards_impl(
180 &self,
181 batch_size: u16,
182 ) -> Result<Option<BackPaginationOutcome>> {
183 // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
184 // to load from storage first, then from network if storage indicated
185 // there's no previous events chunk to load.
186
187 loop {
188 let mut state_guard = self.inner.state.write().await?;
189
190 match state_guard.load_more_events_backwards().await? {
191 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
192 if prev_token.is_none()
193 && !state_guard.waited_for_initial_prev_token().load(Ordering::SeqCst)
194 {
195 // We didn't reload a pagination token, and we haven't waited for one; wait
196 // and start over.
197
198 const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
199
200 // Release the state guard while waiting, to not deadlock the sync task.
201 drop(state_guard);
202
203 // Otherwise, wait for a notification that we received a previous-batch
204 // token.
205 trace!("waiting for a pagination token…");
206 let _ = timeout(
207 self.inner.pagination_batch_token_notifier.notified(),
208 DEFAULT_WAIT_FOR_TOKEN_DURATION,
209 )
210 .await;
211 trace!("done waiting");
212
213 self.inner
214 .state
215 .write()
216 .await?
217 .waited_for_initial_prev_token()
218 .store(true, Ordering::SeqCst);
219
220 // Retry!
221 //
222 // Note: the next call to `load_more_events_backwards` can't return
223 // `WaitForInitialPrevToken` because we've just set to
224 // `waited_for_initial_prev_token`, so this is not an infinite loop.
225 //
226 // Note 2: not a recursive call, because recursive and async have a bad time
227 // together.
228 continue;
229 }
230
231 // We have a gap, so resolve it with a network back-pagination.
232 drop(state_guard);
233 return self.paginate_backwards_with_network(batch_size, prev_token).await;
234 }
235
236 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
237 return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
238 }
239
240 LoadMoreEventsBackwardsOutcome::Events {
241 events,
242 timeline_event_diffs,
243 reached_start,
244 } => {
245 if !timeline_event_diffs.is_empty() {
246 let _ = self.inner.update_sender.send(
247 RoomEventCacheUpdate::UpdateTimelineEvents {
248 diffs: timeline_event_diffs,
249 origin: EventsOrigin::Cache,
250 },
251 );
252
253 // Send a room event cache generic update.
254 let _ =
255 self.inner.generic_update_sender.send(RoomEventCacheGenericUpdate {
256 room_id: self.inner.room_id.clone(),
257 });
258 }
259
260 return Ok(Some(BackPaginationOutcome {
261 reached_start,
262 // This is a backwards pagination. `BackPaginationOutcome` expects events to
263 // be in “reverse order”.
264 events: events.into_iter().rev().collect(),
265 }));
266 }
267 }
268 }
269 }
270
271 /// Run a single pagination request (/messages) to the server.
272 ///
273 /// If there are no previous-batch tokens, it will wait for one for a short
274 /// while to get one, or if it's already done so or if it's seen a
275 /// previous-batch token before, it will immediately indicate it's
276 /// reached the end of the timeline.
277 ///
278 /// Returns `Ok(None)` if the pagination token used during the request has
279 /// disappeared from the in-memory linked chunk after handling the
280 /// response.
281 async fn paginate_backwards_with_network(
282 &self,
283 batch_size: u16,
284 prev_token: Option<String>,
285 ) -> Result<Option<BackPaginationOutcome>> {
286 let (events, new_token) = {
287 let Some(room) = self.inner.weak_room.get() else {
288 // The client is shutting down, return an empty default response.
289 return Ok(Some(BackPaginationOutcome {
290 reached_start: false,
291 events: Default::default(),
292 }));
293 };
294
295 let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
296 options.limit = batch_size.into();
297
298 let response = room
299 .messages(options)
300 .await
301 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
302
303 (response.chunk, response.end)
304 };
305
306 if let Some((outcome, timeline_event_diffs)) = self
307 .inner
308 .state
309 .write()
310 .await?
311 .handle_backpagination(events, new_token, prev_token)
312 .await?
313 {
314 if !timeline_event_diffs.is_empty() {
315 let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
316 diffs: timeline_event_diffs,
317 origin: EventsOrigin::Pagination,
318 });
319 }
320
321 Ok(Some(outcome))
322 } else {
323 // The previous token has gone missing, so the timeline has been reset in the
324 // meanwhile, but it's fine per this function's contract.
325 Ok(None)
326 }
327 }
328
329 /// Returns a subscriber to the pagination status used for the
330 /// back-pagination integrated to the event cache.
331 pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
332 self.inner.pagination_status.subscribe()
333 }
334}