matrix_sdk/event_cache/pagination.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{sync::Arc, time::Duration};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk_base::timeout::timeout;
21use ruma::api::Direction;
22use tracing::{debug, instrument, trace};
23
24use super::{
25 room::{LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
26 BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
27};
28use crate::{
29 event_cache::{EventCacheError, RoomEventCacheGenericUpdate},
30 room::MessagesOptions,
31};
32
33/// Status for the back-pagination on a room event cache.
34#[derive(Debug, PartialEq, Clone, Copy)]
35#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
36pub enum RoomPaginationStatus {
37 /// No back-pagination is happening right now.
38 Idle {
39 /// Have we hit the start of the timeline, i.e. back-paginating wouldn't
40 /// have any effect?
41 hit_timeline_start: bool,
42 },
43
44 /// Back-pagination is already running in the background.
45 Paginating,
46}
47
48/// Small RAII guard to reset the pagination status on drop, if not disarmed in
49/// the meanwhile.
50struct ResetStatusOnDrop {
51 prev_status: Option<RoomPaginationStatus>,
52 pagination_status: SharedObservable<RoomPaginationStatus>,
53}
54
55impl ResetStatusOnDrop {
56 /// Make the RAII guard have no effect.
57 fn disarm(mut self) {
58 self.prev_status = None;
59 }
60}
61
62impl Drop for ResetStatusOnDrop {
63 fn drop(&mut self) {
64 if let Some(status) = self.prev_status.take() {
65 let _ = self.pagination_status.set(status);
66 }
67 }
68}
69
70/// An API object to run pagination queries on a [`super::RoomEventCache`].
71///
72/// Can be created with [`super::RoomEventCache::pagination()`].
73#[allow(missing_debug_implementations)]
74#[derive(Clone)]
75pub struct RoomPagination {
76 pub(super) inner: Arc<RoomEventCacheInner>,
77}
78
79impl RoomPagination {
80 /// Starts a back-pagination for the requested number of events.
81 ///
82 /// This automatically takes care of waiting for a pagination token from
83 /// sync, if we haven't done that before.
84 ///
85 /// It will run multiple back-paginations until one of these two conditions
86 /// is met:
87 /// - either we've reached the start of the timeline,
88 /// - or we've obtained enough events to fulfill the requested number of
89 /// events.
90 #[instrument(skip(self))]
91 pub async fn run_backwards_until(
92 &self,
93 num_requested_events: u16,
94 ) -> Result<BackPaginationOutcome> {
95 let mut events = Vec::new();
96
97 loop {
98 if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
99 events.extend(outcome.events);
100 if outcome.reached_start || events.len() >= num_requested_events as usize {
101 return Ok(BackPaginationOutcome {
102 reached_start: outcome.reached_start,
103 events,
104 });
105 }
106 trace!(
107 "restarting back-pagination, because we haven't reached \
108 the start or obtained enough events yet"
109 );
110 }
111
112 debug!("restarting back-pagination because of a timeline reset.");
113 }
114 }
115
116 /// Run a single back-pagination for the requested number of events.
117 ///
118 /// This automatically takes care of waiting for a pagination token from
119 /// sync, if we haven't done that before.
120 #[instrument(skip(self))]
121 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
122 loop {
123 if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
124 return Ok(outcome);
125 }
126 debug!("restarting back-pagination because of a timeline reset.");
127 }
128 }
129
130 /// Paginate from either the storage or the network, and let pagination
131 /// status observers know about updates.
132 async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
133 // There is at least one gap that must be resolved; reach the network.
134 // First, ensure there's no other ongoing back-pagination.
135 let status_observable = &self.inner.pagination_status;
136
137 let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
138 if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
139 return Err(EventCacheError::AlreadyBackpaginating);
140 }
141
142 let reset_status_on_drop_guard = ResetStatusOnDrop {
143 prev_status: Some(prev_status),
144 pagination_status: status_observable.clone(),
145 };
146
147 match self.paginate_backwards_impl(batch_size).await? {
148 Some(outcome) => {
149 // Back-pagination's over and successful, don't reset the status to the previous
150 // value.
151 reset_status_on_drop_guard.disarm();
152
153 // Notify subscribers that pagination ended.
154 status_observable
155 .set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
156
157 Ok(Some(outcome))
158 }
159
160 None => {
161 // We keep the previous status value, because we haven't obtained more
162 // information about the pagination.
163 Ok(None)
164 }
165 }
166 }
167
168 /// Paginate from either the storage or the network.
169 ///
170 /// This method isn't concerned with setting the pagination status; only the
171 /// caller is.
172 async fn paginate_backwards_impl(
173 &self,
174 batch_size: u16,
175 ) -> Result<Option<BackPaginationOutcome>> {
176 // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
177 // to load from storage first, then from network if storage indicated
178 // there's no previous events chunk to load.
179
180 loop {
181 let mut state_guard = self.inner.state.write().await;
182
183 match state_guard.load_more_events_backwards().await? {
184 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
185 const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
186
187 // Release the state guard while waiting, to not deadlock the sync task.
188 drop(state_guard);
189
190 // Otherwise, wait for a notification that we received a previous-batch token.
191 trace!("waiting for a pagination token…");
192 let _ = timeout(
193 self.inner.pagination_batch_token_notifier.notified(),
194 DEFAULT_WAIT_FOR_TOKEN_DURATION,
195 )
196 .await;
197 trace!("done waiting");
198
199 self.inner.state.write().await.waited_for_initial_prev_token = true;
200
201 // Retry!
202 //
203 // Note: the next call to `load_more_events_backwards` can't return
204 // `WaitForInitialPrevToken` because we've just set to
205 // `waited_for_initial_prev_token`, so this is not an infinite loop.
206 //
207 // Note 2: not a recursive call, because recursive and async have a bad time
208 // together.
209 continue;
210 }
211
212 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
213 // We have a gap, so resolve it with a network back-pagination.
214 drop(state_guard);
215 return self.paginate_backwards_with_network(batch_size, prev_token).await;
216 }
217
218 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
219 return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
220 }
221
222 LoadMoreEventsBackwardsOutcome::Events {
223 events,
224 timeline_event_diffs,
225 reached_start,
226 } => {
227 if !timeline_event_diffs.is_empty() {
228 let _ =
229 self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
230 diffs: timeline_event_diffs,
231 origin: EventsOrigin::Cache,
232 });
233
234 // Send a room event cache generic update.
235 let _ =
236 self.inner.generic_update_sender.send(RoomEventCacheGenericUpdate {
237 room_id: self.inner.room_id.clone(),
238 });
239 }
240
241 return Ok(Some(BackPaginationOutcome {
242 reached_start,
243 // This is a backwards pagination. `BackPaginationOutcome` expects events to
244 // be in “reverse order”.
245 events: events.into_iter().rev().collect(),
246 }));
247 }
248 }
249 }
250 }
251
252 /// Run a single pagination request (/messages) to the server.
253 ///
254 /// If there are no previous-batch tokens, it will wait for one for a short
255 /// while to get one, or if it's already done so or if it's seen a
256 /// previous-batch token before, it will immediately indicate it's
257 /// reached the end of the timeline.
258 async fn paginate_backwards_with_network(
259 &self,
260 batch_size: u16,
261 prev_token: Option<String>,
262 ) -> Result<Option<BackPaginationOutcome>> {
263 let (events, new_token) = {
264 let Some(room) = self.inner.weak_room.get() else {
265 // The client is shutting down, return an empty default response.
266 return Ok(Some(BackPaginationOutcome {
267 reached_start: false,
268 events: Default::default(),
269 }));
270 };
271
272 let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
273 options.limit = batch_size.into();
274
275 let response = room
276 .messages(options)
277 .await
278 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
279
280 (response.chunk, response.end)
281 };
282
283 if let Some((outcome, timeline_event_diffs)) = self
284 .inner
285 .state
286 .write()
287 .await
288 .handle_backpagination(events, new_token, prev_token)
289 .await?
290 {
291 if !timeline_event_diffs.is_empty() {
292 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
293 diffs: timeline_event_diffs,
294 origin: EventsOrigin::Pagination,
295 });
296 }
297
298 Ok(Some(outcome))
299 } else {
300 // The previous token has gone missing, so the timeline has been reset in the
301 // meanwhile, but it's fine per this function's contract.
302 Ok(None)
303 }
304 }
305
306 /// Returns a subscriber to the pagination status used for the
307 /// back-pagination integrated to the event cache.
308 pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
309 self.inner.pagination_status.subscribe()
310 }
311}