matrix_sdk/event_cache/
pagination.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{sync::Arc, time::Duration};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk_base::{
21    deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier, timeout::timeout,
22};
23use matrix_sdk_common::linked_chunk::ChunkContent;
24use ruma::api::Direction;
25use tokio::sync::RwLockWriteGuard;
26use tracing::{debug, instrument, trace};
27
28use super::{
29    deduplicator::DeduplicationOutcome,
30    room::{events::Gap, LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
31    BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheState, RoomEventCacheUpdate,
32};
33use crate::{event_cache::EventCacheError, room::MessagesOptions};
34
35/// Status for the back-pagination on a room event cache.
36#[derive(Debug, PartialEq, Clone, Copy)]
37#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
38pub enum RoomPaginationStatus {
39    /// No back-pagination is happening right now.
40    Idle {
41        /// Have we hit the start of the timeline, i.e. back-paginating wouldn't
42        /// have any effect?
43        hit_timeline_start: bool,
44    },
45
46    /// Back-pagination is already running in the background.
47    Paginating,
48}
49
50/// Small RAII guard to reset the pagination status on drop, if not disarmed in
51/// the meanwhile.
52struct ResetStatusOnDrop {
53    prev_status: Option<RoomPaginationStatus>,
54    pagination_status: SharedObservable<RoomPaginationStatus>,
55}
56
57impl ResetStatusOnDrop {
58    /// Make the RAII guard have no effect.
59    fn disarm(mut self) {
60        self.prev_status = None;
61    }
62}
63
64impl Drop for ResetStatusOnDrop {
65    fn drop(&mut self) {
66        if let Some(status) = self.prev_status.take() {
67            let _ = self.pagination_status.set(status);
68        }
69    }
70}
71
72/// An API object to run pagination queries on a [`super::RoomEventCache`].
73///
74/// Can be created with [`super::RoomEventCache::pagination()`].
75#[allow(missing_debug_implementations)]
76#[derive(Clone)]
77pub struct RoomPagination {
78    pub(super) inner: Arc<RoomEventCacheInner>,
79}
80
81impl RoomPagination {
82    /// Starts a back-pagination for the requested number of events.
83    ///
84    /// This automatically takes care of waiting for a pagination token from
85    /// sync, if we haven't done that before.
86    ///
87    /// It will run multiple back-paginations until one of these two conditions
88    /// is met:
89    /// - either we've reached the start of the timeline,
90    /// - or we've obtained enough events to fulfill the requested number of
91    ///   events.
92    #[instrument(skip(self))]
93    pub async fn run_backwards_until(
94        &self,
95        num_requested_events: u16,
96    ) -> Result<BackPaginationOutcome> {
97        let mut events = Vec::new();
98
99        loop {
100            if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
101                events.extend(outcome.events);
102                if outcome.reached_start || events.len() >= num_requested_events as usize {
103                    return Ok(BackPaginationOutcome {
104                        reached_start: outcome.reached_start,
105                        events,
106                    });
107                }
108                trace!("restarting back-pagination, because we haven't reached the start or obtained enough events yet");
109            }
110
111            debug!("restarting back-pagination because of a timeline reset.");
112        }
113    }
114
115    /// Run a single back-pagination for the requested number of events.
116    ///
117    /// This automatically takes care of waiting for a pagination token from
118    /// sync, if we haven't done that before.
119    #[instrument(skip(self))]
120    pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
121        loop {
122            if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
123                return Ok(outcome);
124            }
125            debug!("restarting back-pagination because of a timeline reset.");
126        }
127    }
128
129    /// Paginate from either the storage or the network, and let pagination
130    /// status observers know about updates.
131    async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
132        // There is at least one gap that must be resolved; reach the network.
133        // First, ensure there's no other ongoing back-pagination.
134        let status_observable = &self.inner.pagination_status;
135
136        let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
137        if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
138            return Err(EventCacheError::AlreadyBackpaginating);
139        }
140
141        let reset_status_on_drop_guard = ResetStatusOnDrop {
142            prev_status: Some(prev_status),
143            pagination_status: status_observable.clone(),
144        };
145
146        match self.paginate_backwards_impl(batch_size).await? {
147            Some(outcome) => {
148                // Back-pagination's over and successful, don't reset the status to the previous
149                // value.
150                reset_status_on_drop_guard.disarm();
151
152                // Notify subscribers that pagination ended.
153                status_observable
154                    .set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
155
156                Ok(Some(outcome))
157            }
158
159            None => {
160                // We keep the previous status value, because we haven't obtained more
161                // information about the pagination.
162                Ok(None)
163            }
164        }
165    }
166
167    /// Paginate from either the storage or the network.
168    ///
169    /// This method isn't concerned with setting the pagination status; only the
170    /// caller is.
171    async fn paginate_backwards_impl(
172        &self,
173        batch_size: u16,
174    ) -> Result<Option<BackPaginationOutcome>> {
175        // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
176        // to load from storage first, then from network if storage indicated
177        // there's no previous events chunk to load.
178
179        loop {
180            let mut state_guard = self.inner.state.write().await;
181
182            match state_guard.load_more_events_backwards().await? {
183                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
184                    const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
185
186                    // Release the state guard while waiting, to not deadlock the sync task.
187                    drop(state_guard);
188
189                    // Otherwise, wait for a notification that we received a previous-batch token.
190                    trace!("waiting for a pagination token…");
191                    let _ = timeout(
192                        self.inner.pagination_batch_token_notifier.notified(),
193                        DEFAULT_WAIT_FOR_TOKEN_DURATION,
194                    )
195                    .await;
196                    trace!("done waiting");
197
198                    self.inner.state.write().await.waited_for_initial_prev_token = true;
199
200                    // Retry!
201                    //
202                    // Note: the next call to `load_more_events_backwards` can't return
203                    // `WaitForInitialPrevToken` because we've just set to
204                    // `waited_for_initial_prev_token`, so this is not an infinite loop.
205                    //
206                    // Note 2: not a recursive call, because recursive and async have a bad time
207                    // together.
208                    continue;
209                }
210
211                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
212                    // We have a gap, so resolve it with a network back-pagination.
213                    drop(state_guard);
214                    return self.paginate_backwards_with_network(batch_size, prev_token).await;
215                }
216
217                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
218                    return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
219                }
220
221                LoadMoreEventsBackwardsOutcome::Events {
222                    events,
223                    timeline_event_diffs,
224                    reached_start,
225                } => {
226                    if !timeline_event_diffs.is_empty() {
227                        let _ =
228                            self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
229                                diffs: timeline_event_diffs,
230                                origin: EventsOrigin::Cache,
231                            });
232                    }
233
234                    return Ok(Some(BackPaginationOutcome {
235                        reached_start,
236                        // This is a backwards pagination. `BackPaginationOutcome` expects events to
237                        // be in “reverse order”.
238                        events: events.into_iter().rev().collect(),
239                    }));
240                }
241            }
242        }
243    }
244
245    /// Run a single pagination request (/messages) to the server.
246    ///
247    /// If there are no previous-batch tokens, it will wait for one for a short
248    /// while to get one, or if it's already done so or if it's seen a
249    /// previous-batch token before, it will immediately indicate it's
250    /// reached the end of the timeline.
251    async fn paginate_backwards_with_network(
252        &self,
253        batch_size: u16,
254        prev_token: Option<String>,
255    ) -> Result<Option<BackPaginationOutcome>> {
256        let (events, new_gap) = {
257            let Some(room) = self.inner.weak_room.get() else {
258                // The client is shutting down, return an empty default response.
259                return Ok(Some(BackPaginationOutcome {
260                    reached_start: false,
261                    events: Default::default(),
262                }));
263            };
264
265            let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
266            options.limit = batch_size.into();
267
268            let response = room.messages(options).await.map_err(|err| {
269                EventCacheError::BackpaginationError(
270                    crate::event_cache::paginator::PaginatorError::SdkError(Box::new(err)),
271                )
272            })?;
273
274            let new_gap = response.end.map(|prev_token| Gap { prev_token });
275
276            (response.chunk, new_gap)
277        };
278
279        // Make sure the `RoomEvents` isn't updated while we are saving events from
280        // backpagination.
281        let state = self.inner.state.write().await;
282
283        // Check that the previous token still exists; otherwise it's a sign that the
284        // room's timeline has been cleared.
285        let prev_gap_chunk_id = if let Some(token) = prev_token {
286            let gap_chunk_id = state.events().chunk_identifier(|chunk| {
287                matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
288            });
289
290            if gap_chunk_id.is_none() {
291                // We got a previous-batch token from the linked chunk *before* running the
292                // request, but it is missing *after* completing the
293                // request.
294                //
295                // It may be a sign the linked chunk has been reset, but it's fine, per this
296                // function's contract.
297                return Ok(None);
298            }
299
300            gap_chunk_id
301        } else {
302            None
303        };
304
305        self.handle_network_pagination_result(state, events, new_gap, prev_gap_chunk_id)
306            .await
307            .map(Some)
308    }
309
310    /// Handle the result of a successful network back-pagination.
311    async fn handle_network_pagination_result(
312        &self,
313        mut state: RwLockWriteGuard<'_, RoomEventCacheState>,
314        events: Vec<TimelineEvent>,
315        new_gap: Option<Gap>,
316        prev_gap_id: Option<ChunkIdentifier>,
317    ) -> Result<BackPaginationOutcome> {
318        // If there's no new previous gap, then we've reached the start of the timeline.
319        let network_reached_start = new_gap.is_none();
320
321        let (
322            DeduplicationOutcome {
323                all_events: mut events,
324                in_memory_duplicated_event_ids,
325                in_store_duplicated_event_ids,
326            },
327            all_duplicates,
328        ) = state.collect_valid_and_duplicated_events(events).await?;
329
330        // If not all the events have been back-paginated, we need to remove the
331        // previous ones, otherwise we can end up with misordered events.
332        //
333        // Consider the following scenario:
334        // - sync returns [D, E, F]
335        // - then sync returns [] with a previous batch token PB1, so the internal
336        //   linked chunk state is [D, E, F, PB1].
337        // - back-paginating with PB1 may return [A, B, C, D, E, F].
338        //
339        // Only inserting the new events when replacing PB1 would result in a timeline
340        // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
341        // all the events, in case this happens (see also #4746).
342
343        let mut event_diffs = if !all_duplicates {
344            // Let's forget all the previous events.
345            state
346                .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
347                .await?
348        } else {
349            // All new events are duplicated, they can all be ignored.
350            events.clear();
351            Default::default()
352        };
353
354        let next_diffs = state
355            .with_events_mut(|room_events| {
356            // Reverse the order of the events as `/messages` has been called with `dir=b`
357            // (backwards). The `RoomEvents` API expects the first event to be the oldest.
358            // Let's re-order them for this block.
359            let reversed_events = events
360                .iter()
361                .rev()
362                .cloned()
363                .collect::<Vec<_>>();
364
365            let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
366
367            // First, insert events.
368            let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id {
369                // There is a prior gap, let's replace it by new events!
370                if all_duplicates {
371                    assert!(reversed_events.is_empty());
372                }
373
374                trace!("replacing previous gap with the back-paginated events");
375
376                // Replace the gap with the events we just deduplicated. This might get rid of the
377                // underlying gap, if the conditions are favorable to us.
378                room_events.replace_gap_at(reversed_events.clone(), gap_id)
379                    .expect("gap_identifier is a valid chunk id we read previously")
380            } else if let Some(pos) = first_event_pos {
381                // No prior gap, but we had some events: assume we need to prepend events
382                // before those.
383                trace!("inserted events before the first known event");
384
385                room_events
386                    .insert_events_at(reversed_events.clone(), pos)
387                    .expect("pos is a valid position we just read above");
388
389                Some(pos)
390            } else {
391                // No prior gap, and no prior events: push the events.
392                trace!("pushing events received from back-pagination");
393
394                room_events.push_events(reversed_events.clone());
395
396                // A new gap may be inserted before the new events, if there are any.
397                room_events.events().next().map(|(item_pos, _)| item_pos)
398            };
399
400            // And insert the new gap if needs be.
401            //
402            // We only do this when at least one new, non-duplicated event, has been added to
403            // the chunk. Otherwise it means we've back-paginated all the known events.
404            if !all_duplicates {
405                if let Some(new_gap) = new_gap {
406                    if let Some(new_pos) = insert_new_gap_pos {
407                        room_events
408                            .insert_gap_at(new_gap, new_pos)
409                            .expect("events_chunk_pos represents a valid chunk position");
410                    } else {
411                        room_events.push_gap(new_gap);
412                    }
413                }
414            } else {
415                debug!("not storing previous batch token, because we deduplicated all new back-paginated events");
416            }
417
418            reversed_events
419        })
420        .await?;
421
422        event_diffs.extend(next_diffs);
423
424        // There could be an inconsistency between the network (which thinks we hit the
425        // start of the timeline) and the disk (which has the initial empty
426        // chunks), so tweak the `reached_start` value so that it reflects the disk
427        // state in priority instead.
428        let reached_start = {
429            // There are no gaps.
430            let has_gaps = state.events().chunks().any(|chunk| chunk.is_gap());
431
432            // The first chunk has no predecessors.
433            let first_chunk_is_definitive_head =
434                state.events().chunks().next().map(|chunk| chunk.is_definitive_head());
435
436            let reached_start =
437                !has_gaps && first_chunk_is_definitive_head.unwrap_or(network_reached_start);
438
439            trace!(
440                ?network_reached_start,
441                ?has_gaps,
442                ?first_chunk_is_definitive_head,
443                ?reached_start,
444                "finished handling network back-pagination"
445            );
446
447            reached_start
448        };
449
450        let backpagination_outcome = BackPaginationOutcome { events, reached_start };
451
452        if !event_diffs.is_empty() {
453            let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
454                diffs: event_diffs,
455                origin: EventsOrigin::Pagination,
456            });
457        }
458
459        Ok(backpagination_outcome)
460    }
461
462    /// Returns a subscriber to the pagination status used for the
463    /// back-pagination integrated to the event cache.
464    pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
465        self.inner.pagination_status.subscribe()
466    }
467}
468
469/// Pagination token data, indicating in which state is the current pagination.
470#[derive(Clone, Debug, PartialEq)]
471pub enum PaginationToken {
472    /// We never had a pagination token, so we'll start back-paginating from the
473    /// end, or forward-paginating from the start.
474    None,
475    /// We paginated once before, and we received a prev/next batch token that
476    /// we may reuse for the next query.
477    HasMore(String),
478    /// We've hit one end of the timeline (either the start or the actual end),
479    /// so there's no need to continue paginating.
480    HitEnd,
481}
482
483impl From<Option<String>> for PaginationToken {
484    fn from(token: Option<String>) -> Self {
485        match token {
486            Some(val) => Self::HasMore(val),
487            None => Self::None,
488        }
489    }
490}