matrix_sdk/event_cache/pagination.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{sync::Arc, time::Duration};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk_base::timeout::timeout;
21use ruma::api::Direction;
22use tracing::{debug, instrument, trace};
23
24use super::{
25 BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
26 room::{LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
27};
28use crate::{
29 event_cache::{EventCacheError, RoomEventCacheGenericUpdate},
30 room::MessagesOptions,
31};
32
33/// Status for the back-pagination on a room event cache.
34#[derive(Debug, PartialEq, Clone, Copy)]
35#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
36pub enum RoomPaginationStatus {
37 /// No back-pagination is happening right now.
38 Idle {
39 /// Have we hit the start of the timeline, i.e. back-paginating wouldn't
40 /// have any effect?
41 hit_timeline_start: bool,
42 },
43
44 /// Back-pagination is already running in the background.
45 Paginating,
46}
47
48/// Small RAII guard to reset the pagination status on drop, if not disarmed in
49/// the meanwhile.
50struct ResetStatusOnDrop {
51 prev_status: Option<RoomPaginationStatus>,
52 pagination_status: SharedObservable<RoomPaginationStatus>,
53}
54
55impl ResetStatusOnDrop {
56 /// Make the RAII guard have no effect.
57 fn disarm(mut self) {
58 self.prev_status = None;
59 }
60}
61
62impl Drop for ResetStatusOnDrop {
63 fn drop(&mut self) {
64 if let Some(status) = self.prev_status.take() {
65 let _ = self.pagination_status.set(status);
66 }
67 }
68}
69
70/// An API object to run pagination queries on a [`super::RoomEventCache`].
71///
72/// Can be created with [`super::RoomEventCache::pagination()`].
73#[allow(missing_debug_implementations)]
74#[derive(Clone)]
75pub struct RoomPagination {
76 pub(super) inner: Arc<RoomEventCacheInner>,
77}
78
79impl RoomPagination {
80 /// Starts a back-pagination for the requested number of events.
81 ///
82 /// This automatically takes care of waiting for a pagination token from
83 /// sync, if we haven't done that before.
84 ///
85 /// It will run multiple back-paginations until one of these two conditions
86 /// is met:
87 /// - either we've reached the start of the timeline,
88 /// - or we've obtained enough events to fulfill the requested number of
89 /// events.
90 #[instrument(skip(self))]
91 pub async fn run_backwards_until(
92 &self,
93 num_requested_events: u16,
94 ) -> Result<BackPaginationOutcome> {
95 let mut events = Vec::new();
96
97 loop {
98 if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
99 events.extend(outcome.events);
100 if outcome.reached_start || events.len() >= num_requested_events as usize {
101 return Ok(BackPaginationOutcome {
102 reached_start: outcome.reached_start,
103 events,
104 });
105 }
106 trace!(
107 "restarting back-pagination, because we haven't reached \
108 the start or obtained enough events yet"
109 );
110 }
111
112 debug!("restarting back-pagination because of a timeline reset.");
113 }
114 }
115
116 /// Run a single back-pagination for the requested number of events.
117 ///
118 /// This automatically takes care of waiting for a pagination token from
119 /// sync, if we haven't done that before.
120 #[instrument(skip(self))]
121 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
122 loop {
123 if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
124 return Ok(outcome);
125 }
126 debug!("restarting back-pagination because of a timeline reset.");
127 }
128 }
129
130 /// Paginate from either the storage or the network, and let pagination
131 /// status observers know about updates.
132 ///
133 /// Returns `Ok(None)` if the pagination token used during a network
134 /// pagination has disappeared from the in-memory linked chunk after
135 /// handling the response.
136 async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
137 // There is at least one gap that must be resolved; reach the network.
138 // First, ensure there's no other ongoing back-pagination.
139 let status_observable = &self.inner.pagination_status;
140
141 let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
142 if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
143 return Err(EventCacheError::AlreadyBackpaginating);
144 }
145
146 let reset_status_on_drop_guard = ResetStatusOnDrop {
147 prev_status: Some(prev_status),
148 pagination_status: status_observable.clone(),
149 };
150
151 match self.paginate_backwards_impl(batch_size).await? {
152 Some(outcome) => {
153 // Back-pagination's over and successful, don't reset the status to the previous
154 // value.
155 reset_status_on_drop_guard.disarm();
156
157 // Notify subscribers that pagination ended.
158 status_observable
159 .set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
160
161 Ok(Some(outcome))
162 }
163
164 None => Ok(None),
165 }
166 }
167
168 /// Paginate from either the storage or the network.
169 ///
170 /// This method isn't concerned with setting the pagination status; only the
171 /// caller is.
172 ///
173 /// Returns `Ok(None)` if the pagination token used during a network
174 /// pagination has disappeared from the in-memory linked chunk after
175 /// handling the response.
176 async fn paginate_backwards_impl(
177 &self,
178 batch_size: u16,
179 ) -> Result<Option<BackPaginationOutcome>> {
180 // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
181 // to load from storage first, then from network if storage indicated
182 // there's no previous events chunk to load.
183
184 loop {
185 let mut state_guard = self.inner.state.write().await;
186
187 match state_guard.load_more_events_backwards().await? {
188 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
189 if prev_token.is_none() && !state_guard.waited_for_initial_prev_token {
190 // We didn't reload a pagination token, and we haven't waited for one; wait
191 // and start over.
192
193 const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
194
195 // Release the state guard while waiting, to not deadlock the sync task.
196 drop(state_guard);
197
198 // Otherwise, wait for a notification that we received a previous-batch
199 // token.
200 trace!("waiting for a pagination token…");
201 let _ = timeout(
202 self.inner.pagination_batch_token_notifier.notified(),
203 DEFAULT_WAIT_FOR_TOKEN_DURATION,
204 )
205 .await;
206 trace!("done waiting");
207
208 self.inner.state.write().await.waited_for_initial_prev_token = true;
209
210 // Retry!
211 //
212 // Note: the next call to `load_more_events_backwards` can't return
213 // `WaitForInitialPrevToken` because we've just set to
214 // `waited_for_initial_prev_token`, so this is not an infinite loop.
215 //
216 // Note 2: not a recursive call, because recursive and async have a bad time
217 // together.
218 continue;
219 }
220
221 // We have a gap, so resolve it with a network back-pagination.
222 drop(state_guard);
223 return self.paginate_backwards_with_network(batch_size, prev_token).await;
224 }
225
226 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
227 return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
228 }
229
230 LoadMoreEventsBackwardsOutcome::Events {
231 events,
232 timeline_event_diffs,
233 reached_start,
234 } => {
235 if !timeline_event_diffs.is_empty() {
236 let _ =
237 self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
238 diffs: timeline_event_diffs,
239 origin: EventsOrigin::Cache,
240 });
241
242 // Send a room event cache generic update.
243 let _ =
244 self.inner.generic_update_sender.send(RoomEventCacheGenericUpdate {
245 room_id: self.inner.room_id.clone(),
246 });
247 }
248
249 return Ok(Some(BackPaginationOutcome {
250 reached_start,
251 // This is a backwards pagination. `BackPaginationOutcome` expects events to
252 // be in “reverse order”.
253 events: events.into_iter().rev().collect(),
254 }));
255 }
256 }
257 }
258 }
259
260 /// Run a single pagination request (/messages) to the server.
261 ///
262 /// If there are no previous-batch tokens, it will wait for one for a short
263 /// while to get one, or if it's already done so or if it's seen a
264 /// previous-batch token before, it will immediately indicate it's
265 /// reached the end of the timeline.
266 ///
267 /// Returns `Ok(None)` if the pagination token used during the request has
268 /// disappeared from the in-memory linked chunk after handling the
269 /// response.
270 async fn paginate_backwards_with_network(
271 &self,
272 batch_size: u16,
273 prev_token: Option<String>,
274 ) -> Result<Option<BackPaginationOutcome>> {
275 let (events, new_token) = {
276 let Some(room) = self.inner.weak_room.get() else {
277 // The client is shutting down, return an empty default response.
278 return Ok(Some(BackPaginationOutcome {
279 reached_start: false,
280 events: Default::default(),
281 }));
282 };
283
284 let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
285 options.limit = batch_size.into();
286
287 let response = room
288 .messages(options)
289 .await
290 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
291
292 (response.chunk, response.end)
293 };
294
295 if let Some((outcome, timeline_event_diffs)) = self
296 .inner
297 .state
298 .write()
299 .await
300 .handle_backpagination(events, new_token, prev_token)
301 .await?
302 {
303 if !timeline_event_diffs.is_empty() {
304 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
305 diffs: timeline_event_diffs,
306 origin: EventsOrigin::Pagination,
307 });
308 }
309
310 Ok(Some(outcome))
311 } else {
312 // The previous token has gone missing, so the timeline has been reset in the
313 // meanwhile, but it's fine per this function's contract.
314 Ok(None)
315 }
316 }
317
318 /// Returns a subscriber to the pagination status used for the
319 /// back-pagination integrated to the event cache.
320 pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
321 self.inner.pagination_status.subscribe()
322 }
323}