matrix_sdk/event_cache/pagination.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{sync::Arc, time::Duration};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk_base::{
21 deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier, timeout::timeout,
22};
23use matrix_sdk_common::linked_chunk::ChunkContent;
24use ruma::api::Direction;
25use tokio::sync::RwLockWriteGuard;
26use tracing::{debug, instrument, trace};
27
28use super::{
29 deduplicator::DeduplicationOutcome,
30 room::{events::Gap, LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
31 BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheState, RoomEventCacheUpdate,
32};
33use crate::{event_cache::EventCacheError, room::MessagesOptions};
34
35/// Status for the back-pagination on a room event cache.
36#[derive(Debug, PartialEq, Clone, Copy)]
37#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
38pub enum RoomPaginationStatus {
39 /// No back-pagination is happening right now.
40 Idle {
41 /// Have we hit the start of the timeline, i.e. back-paginating wouldn't
42 /// have any effect?
43 hit_timeline_start: bool,
44 },
45
46 /// Back-pagination is already running in the background.
47 Paginating,
48}
49
50/// Small RAII guard to reset the pagination status on drop, if not disarmed in
51/// the meanwhile.
52struct ResetStatusOnDrop {
53 prev_status: Option<RoomPaginationStatus>,
54 pagination_status: SharedObservable<RoomPaginationStatus>,
55}
56
57impl ResetStatusOnDrop {
58 /// Make the RAII guard have no effect.
59 fn disarm(mut self) {
60 self.prev_status = None;
61 }
62}
63
64impl Drop for ResetStatusOnDrop {
65 fn drop(&mut self) {
66 if let Some(status) = self.prev_status.take() {
67 let _ = self.pagination_status.set(status);
68 }
69 }
70}
71
72/// An API object to run pagination queries on a [`super::RoomEventCache`].
73///
74/// Can be created with [`super::RoomEventCache::pagination()`].
75#[allow(missing_debug_implementations)]
76#[derive(Clone)]
77pub struct RoomPagination {
78 pub(super) inner: Arc<RoomEventCacheInner>,
79}
80
81impl RoomPagination {
82 /// Starts a back-pagination for the requested number of events.
83 ///
84 /// This automatically takes care of waiting for a pagination token from
85 /// sync, if we haven't done that before.
86 ///
87 /// It will run multiple back-paginations until one of these two conditions
88 /// is met:
89 /// - either we've reached the start of the timeline,
90 /// - or we've obtained enough events to fulfill the requested number of
91 /// events.
92 #[instrument(skip(self))]
93 pub async fn run_backwards_until(
94 &self,
95 num_requested_events: u16,
96 ) -> Result<BackPaginationOutcome> {
97 let mut events = Vec::new();
98
99 loop {
100 if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
101 events.extend(outcome.events);
102 if outcome.reached_start || events.len() >= num_requested_events as usize {
103 return Ok(BackPaginationOutcome {
104 reached_start: outcome.reached_start,
105 events,
106 });
107 }
108 trace!("restarting back-pagination, because we haven't reached the start or obtained enough events yet");
109 }
110
111 debug!("restarting back-pagination because of a timeline reset.");
112 }
113 }
114
115 /// Run a single back-pagination for the requested number of events.
116 ///
117 /// This automatically takes care of waiting for a pagination token from
118 /// sync, if we haven't done that before.
119 #[instrument(skip(self))]
120 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
121 loop {
122 if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
123 return Ok(outcome);
124 }
125 debug!("restarting back-pagination because of a timeline reset.");
126 }
127 }
128
129 /// Paginate from either the storage or the network, and let pagination
130 /// status observers know about updates.
131 async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
132 // There is at least one gap that must be resolved; reach the network.
133 // First, ensure there's no other ongoing back-pagination.
134 let status_observable = &self.inner.pagination_status;
135
136 let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
137 if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
138 return Err(EventCacheError::AlreadyBackpaginating);
139 }
140
141 let reset_status_on_drop_guard = ResetStatusOnDrop {
142 prev_status: Some(prev_status),
143 pagination_status: status_observable.clone(),
144 };
145
146 match self.paginate_backwards_impl(batch_size).await? {
147 Some(outcome) => {
148 // Back-pagination's over and successful, don't reset the status to the previous
149 // value.
150 reset_status_on_drop_guard.disarm();
151
152 // Notify subscribers that pagination ended.
153 status_observable
154 .set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
155
156 Ok(Some(outcome))
157 }
158
159 None => {
160 // We keep the previous status value, because we haven't obtained more
161 // information about the pagination.
162 Ok(None)
163 }
164 }
165 }
166
167 /// Paginate from either the storage or the network.
168 ///
169 /// This method isn't concerned with setting the pagination status; only the
170 /// caller is.
171 async fn paginate_backwards_impl(
172 &self,
173 batch_size: u16,
174 ) -> Result<Option<BackPaginationOutcome>> {
175 // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
176 // to load from storage first, then from network if storage indicated
177 // there's no previous events chunk to load.
178
179 loop {
180 let mut state_guard = self.inner.state.write().await;
181
182 match state_guard.load_more_events_backwards().await? {
183 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
184 const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
185
186 // Release the state guard while waiting, to not deadlock the sync task.
187 drop(state_guard);
188
189 // Otherwise, wait for a notification that we received a previous-batch token.
190 trace!("waiting for a pagination token…");
191 let _ = timeout(
192 self.inner.pagination_batch_token_notifier.notified(),
193 DEFAULT_WAIT_FOR_TOKEN_DURATION,
194 )
195 .await;
196 trace!("done waiting");
197
198 self.inner.state.write().await.waited_for_initial_prev_token = true;
199
200 // Retry!
201 //
202 // Note: the next call to `load_more_events_backwards` can't return
203 // `WaitForInitialPrevToken` because we've just set to
204 // `waited_for_initial_prev_token`, so this is not an infinite loop.
205 //
206 // Note 2: not a recursive call, because recursive and async have a bad time
207 // together.
208 continue;
209 }
210
211 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
212 // We have a gap, so resolve it with a network back-pagination.
213 drop(state_guard);
214 return self.paginate_backwards_with_network(batch_size, prev_token).await;
215 }
216
217 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
218 return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
219 }
220
221 LoadMoreEventsBackwardsOutcome::Events {
222 events,
223 timeline_event_diffs,
224 reached_start,
225 } => {
226 if !timeline_event_diffs.is_empty() {
227 let _ =
228 self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
229 diffs: timeline_event_diffs,
230 origin: EventsOrigin::Cache,
231 });
232 }
233
234 return Ok(Some(BackPaginationOutcome {
235 reached_start,
236 // This is a backwards pagination. `BackPaginationOutcome` expects events to
237 // be in “reverse order”.
238 events: events.into_iter().rev().collect(),
239 }));
240 }
241 }
242 }
243 }
244
245 /// Run a single pagination request (/messages) to the server.
246 ///
247 /// If there are no previous-batch tokens, it will wait for one for a short
248 /// while to get one, or if it's already done so or if it's seen a
249 /// previous-batch token before, it will immediately indicate it's
250 /// reached the end of the timeline.
251 async fn paginate_backwards_with_network(
252 &self,
253 batch_size: u16,
254 prev_token: Option<String>,
255 ) -> Result<Option<BackPaginationOutcome>> {
256 let (events, new_gap) = {
257 let Some(room) = self.inner.weak_room.get() else {
258 // The client is shutting down, return an empty default response.
259 return Ok(Some(BackPaginationOutcome {
260 reached_start: false,
261 events: Default::default(),
262 }));
263 };
264
265 let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
266 options.limit = batch_size.into();
267
268 let response = room.messages(options).await.map_err(|err| {
269 EventCacheError::BackpaginationError(
270 crate::event_cache::paginator::PaginatorError::SdkError(Box::new(err)),
271 )
272 })?;
273
274 let new_gap = response.end.map(|prev_token| Gap { prev_token });
275
276 (response.chunk, new_gap)
277 };
278
279 // Make sure the `RoomEvents` isn't updated while we are saving events from
280 // backpagination.
281 let state = self.inner.state.write().await;
282
283 // Check that the previous token still exists; otherwise it's a sign that the
284 // room's timeline has been cleared.
285 let prev_gap_chunk_id = if let Some(token) = prev_token {
286 let gap_chunk_id = state.events().chunk_identifier(|chunk| {
287 matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
288 });
289
290 if gap_chunk_id.is_none() {
291 // We got a previous-batch token from the linked chunk *before* running the
292 // request, but it is missing *after* completing the
293 // request.
294 //
295 // It may be a sign the linked chunk has been reset, but it's fine, per this
296 // function's contract.
297 return Ok(None);
298 }
299
300 gap_chunk_id
301 } else {
302 None
303 };
304
305 self.handle_network_pagination_result(state, events, new_gap, prev_gap_chunk_id)
306 .await
307 .map(Some)
308 }
309
310 /// Handle the result of a successful network back-pagination.
311 async fn handle_network_pagination_result(
312 &self,
313 mut state: RwLockWriteGuard<'_, RoomEventCacheState>,
314 events: Vec<TimelineEvent>,
315 new_gap: Option<Gap>,
316 prev_gap_id: Option<ChunkIdentifier>,
317 ) -> Result<BackPaginationOutcome> {
318 // If there's no new previous gap, then we've reached the start of the timeline.
319 let network_reached_start = new_gap.is_none();
320
321 let (
322 DeduplicationOutcome {
323 all_events: mut events,
324 in_memory_duplicated_event_ids,
325 in_store_duplicated_event_ids,
326 },
327 all_duplicates,
328 ) = state.collect_valid_and_duplicated_events(events).await?;
329
330 // If not all the events have been back-paginated, we need to remove the
331 // previous ones, otherwise we can end up with misordered events.
332 //
333 // Consider the following scenario:
334 // - sync returns [D, E, F]
335 // - then sync returns [] with a previous batch token PB1, so the internal
336 // linked chunk state is [D, E, F, PB1].
337 // - back-paginating with PB1 may return [A, B, C, D, E, F].
338 //
339 // Only inserting the new events when replacing PB1 would result in a timeline
340 // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
341 // all the events, in case this happens (see also #4746).
342
343 let mut event_diffs = if !all_duplicates {
344 // Let's forget all the previous events.
345 state
346 .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
347 .await?
348 } else {
349 // All new events are duplicated, they can all be ignored.
350 events.clear();
351 Default::default()
352 };
353
354 let next_diffs = state
355 .with_events_mut(|room_events| {
356 // Reverse the order of the events as `/messages` has been called with `dir=b`
357 // (backwards). The `RoomEvents` API expects the first event to be the oldest.
358 // Let's re-order them for this block.
359 let reversed_events = events
360 .iter()
361 .rev()
362 .cloned()
363 .collect::<Vec<_>>();
364
365 let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
366
367 // First, insert events.
368 let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id {
369 // There is a prior gap, let's replace it by new events!
370 if all_duplicates {
371 assert!(reversed_events.is_empty());
372 }
373
374 trace!("replacing previous gap with the back-paginated events");
375
376 // Replace the gap with the events we just deduplicated. This might get rid of the
377 // underlying gap, if the conditions are favorable to us.
378 room_events.replace_gap_at(reversed_events.clone(), gap_id)
379 .expect("gap_identifier is a valid chunk id we read previously")
380 } else if let Some(pos) = first_event_pos {
381 // No prior gap, but we had some events: assume we need to prepend events
382 // before those.
383 trace!("inserted events before the first known event");
384
385 room_events
386 .insert_events_at(reversed_events.clone(), pos)
387 .expect("pos is a valid position we just read above");
388
389 Some(pos)
390 } else {
391 // No prior gap, and no prior events: push the events.
392 trace!("pushing events received from back-pagination");
393
394 room_events.push_events(reversed_events.clone());
395
396 // A new gap may be inserted before the new events, if there are any.
397 room_events.events().next().map(|(item_pos, _)| item_pos)
398 };
399
400 // And insert the new gap if needs be.
401 //
402 // We only do this when at least one new, non-duplicated event, has been added to
403 // the chunk. Otherwise it means we've back-paginated all the known events.
404 if !all_duplicates {
405 if let Some(new_gap) = new_gap {
406 if let Some(new_pos) = insert_new_gap_pos {
407 room_events
408 .insert_gap_at(new_gap, new_pos)
409 .expect("events_chunk_pos represents a valid chunk position");
410 } else {
411 room_events.push_gap(new_gap);
412 }
413 }
414 } else {
415 debug!("not storing previous batch token, because we deduplicated all new back-paginated events");
416 }
417
418 reversed_events
419 })
420 .await?;
421
422 event_diffs.extend(next_diffs);
423
424 // There could be an inconsistency between the network (which thinks we hit the
425 // start of the timeline) and the disk (which has the initial empty
426 // chunks), so tweak the `reached_start` value so that it reflects the disk
427 // state in priority instead.
428 let reached_start = {
429 // There are no gaps.
430 let has_gaps = state.events().chunks().any(|chunk| chunk.is_gap());
431
432 // The first chunk has no predecessors.
433 let first_chunk_is_definitive_head =
434 state.events().chunks().next().map(|chunk| chunk.is_definitive_head());
435
436 let reached_start =
437 !has_gaps && first_chunk_is_definitive_head.unwrap_or(network_reached_start);
438
439 trace!(
440 ?network_reached_start,
441 ?has_gaps,
442 ?first_chunk_is_definitive_head,
443 ?reached_start,
444 "finished handling network back-pagination"
445 );
446
447 reached_start
448 };
449
450 let backpagination_outcome = BackPaginationOutcome { events, reached_start };
451
452 if !event_diffs.is_empty() {
453 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
454 diffs: event_diffs,
455 origin: EventsOrigin::Pagination,
456 });
457 }
458
459 Ok(backpagination_outcome)
460 }
461
462 /// Returns a subscriber to the pagination status used for the
463 /// back-pagination integrated to the event cache.
464 pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
465 self.inner.pagination_status.subscribe()
466 }
467}
468
469/// Pagination token data, indicating in which state is the current pagination.
470#[derive(Clone, Debug, PartialEq)]
471pub enum PaginationToken {
472 /// We never had a pagination token, so we'll start back-paginating from the
473 /// end, or forward-paginating from the start.
474 None,
475 /// We paginated once before, and we received a prev/next batch token that
476 /// we may reuse for the next query.
477 HasMore(String),
478 /// We've hit one end of the timeline (either the start or the actual end),
479 /// so there's no need to continue paginating.
480 HitEnd,
481}
482
483impl From<Option<String>> for PaginationToken {
484 fn from(token: Option<String>) -> Self {
485 match token {
486 Some(val) => Self::HasMore(val),
487 None => Self::None,
488 }
489 }
490}