Skip to main content

matrix_sdk/event_cache/caches/room/
pagination.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The [`RoomPagination`] type makes it possible to paginate a
16//! [`RoomEventCache`].
17//!
18//! [`RoomEventCache`]: super::super::super::RoomEventCache
19
20use std::sync::Arc;
21
22use eyeball::{SharedObservable, Subscriber};
23use eyeball_im::VectorDiff;
24use matrix_sdk_base::{
25    event_cache::{Event, Gap},
26    linked_chunk::{ChunkContent, LinkedChunkId},
27};
28use ruma::api::Direction;
29
30pub use super::super::pagination::PaginationStatus;
31use super::{
32    super::{
33        super::{
34            EventCacheError, EventsOrigin, Result, RoomEventCacheGenericUpdate,
35            deduplicator::{DeduplicationOutcome, filter_duplicate_events},
36        },
37        TimelineVectorDiffs,
38        pagination::{
39            BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
40        },
41    },
42    PostProcessingOrigin, RoomEventCacheInner, RoomEventCacheUpdate,
43};
44use crate::room::MessagesOptions;
45
46/// An API object to run pagination queries on a [`RoomEventCache`].
47///
48/// Can be created with [`RoomEventCache::pagination()`].
49///
50/// [`RoomEventCache`]: super::super::super::RoomEventCache
51/// [`RoomEventCache::pagination()`]: super::super::super::RoomEventCache::pagination
52#[allow(missing_debug_implementations)]
53#[derive(Clone)]
54pub struct RoomPagination(Pagination<Arc<RoomEventCacheInner>>);
55
56impl RoomPagination {
57    /// Construct a new [`RoomPagination`].
58    pub(in super::super::super) fn new(cache: Arc<RoomEventCacheInner>) -> Self {
59        Self(Pagination::new(cache))
60    }
61
62    /// Starts a back-pagination for the requested number of events.
63    ///
64    /// This automatically takes care of waiting for a pagination token from
65    /// sync, if we haven't done that before.
66    ///
67    /// It will run multiple back-paginations until one of these two conditions
68    /// is met:
69    /// - either we've reached the start of the timeline,
70    /// - or we've obtained enough events to fulfill the requested number of
71    ///   events.
72    pub async fn run_backwards_until(
73        &self,
74        num_requested_events: u16,
75    ) -> Result<BackPaginationOutcome> {
76        self.0.run_backwards_until(num_requested_events).await
77    }
78
79    /// Run a single back-pagination for the requested number of events.
80    ///
81    /// This automatically takes care of waiting for a pagination token from
82    /// sync, if we haven't done that before.
83    pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
84        self.0.run_backwards_once(batch_size).await
85    }
86
87    /// Returns a subscriber to the pagination status.
88    pub fn status(&self) -> Subscriber<PaginationStatus> {
89        self.0.cache.status().subscribe()
90    }
91}
92
93impl PaginatedCache for Arc<RoomEventCacheInner> {
94    fn status(&self) -> &SharedObservable<PaginationStatus> {
95        &self.pagination_status
96    }
97
98    async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome> {
99        self.state.write().await?.load_more_events_backwards().await
100    }
101
102    async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()> {
103        *self.state.write().await?.waited_for_initial_prev_token() = true;
104
105        Ok(())
106    }
107
108    async fn wait_for_prev_token(&self) {
109        self.pagination_batch_token_notifier.notified().await
110    }
111
112    async fn paginate_backwards_with_network(
113        &self,
114        batch_size: u16,
115        prev_token: &Option<String>,
116    ) -> Result<Option<(Vec<Event>, Option<String>)>> {
117        let Some(room) = self.weak_room.get() else {
118            // The client is shutting down.
119            return Ok(None);
120        };
121
122        let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
123        options.limit = batch_size.into();
124
125        let response = room
126            .messages(options)
127            .await
128            .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
129
130        Ok(Some((response.chunk, response.end)))
131    }
132
133    async fn conclude_backwards_pagination_from_disk(
134        &self,
135        events: Vec<Event>,
136        timeline_event_diffs: Vec<VectorDiff<Event>>,
137        reached_start: bool,
138    ) -> BackPaginationOutcome {
139        if !timeline_event_diffs.is_empty() {
140            self.update_sender.send(
141                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
142                    diffs: timeline_event_diffs,
143                    origin: EventsOrigin::Cache,
144                }),
145                Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
146            );
147        }
148
149        BackPaginationOutcome {
150            reached_start,
151            // This is a backwards pagination. `BackPaginationOutcome` expects events to
152            // be in “reverse order”.
153            events: events.into_iter().rev().collect(),
154        }
155    }
156
157    async fn conclude_backwards_pagination_from_network(
158        &self,
159        events: Vec<Event>,
160        prev_token: Option<String>,
161        mut new_token: Option<String>,
162    ) -> Result<Option<BackPaginationOutcome>> {
163        let mut state = self.state.write().await?;
164
165        // Check that the previous token still exists; otherwise it's a sign that the
166        // room's timeline has been cleared.
167        let prev_gap_id = if let Some(token) = prev_token {
168            // Find the corresponding gap in the in-memory linked chunk.
169            let gap_chunk_id = state.room_linked_chunk().chunk_identifier(|chunk| {
170                    matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
171                });
172
173            if gap_chunk_id.is_none() {
174                // We got a previous-batch token from the linked chunk *before* running the
175                // request, but it is missing *after* completing the request.
176                //
177                // It may be a sign the linked chunk has been reset, but it's fine!
178                return Ok(None);
179            }
180
181            gap_chunk_id
182        } else {
183            None
184        };
185
186        let DeduplicationOutcome {
187            all_events: mut events,
188            in_memory_duplicated_event_ids,
189            in_store_duplicated_event_ids,
190            non_empty_all_duplicates: all_duplicates,
191        } = {
192            let room_linked_chunk = state.room_linked_chunk();
193
194            filter_duplicate_events(
195                &state.state.own_user_id,
196                &state.store,
197                LinkedChunkId::Room(&state.state.room_id),
198                room_linked_chunk,
199                events,
200            )
201            .await?
202        };
203
204        // If not all the events have been back-paginated, we need to remove the
205        // previous ones, otherwise we can end up with misordered events.
206        //
207        // Consider the following scenario:
208        // - sync returns [D, E, F]
209        // - then sync returns [] with a previous batch token PB1, so the internal
210        //   linked chunk state is [D, E, F, PB1].
211        // - back-paginating with PB1 may return [A, B, C, D, E, F].
212        //
213        // Only inserting the new events when replacing PB1 would result in a timeline
214        // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
215        // all the events, in case this happens (see also #4746).
216
217        if !all_duplicates {
218            // Let's forget all the previous events.
219            state
220                .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
221                .await?;
222        } else {
223            // All new events are duplicated, they can all be ignored.
224            events.clear();
225            // The gap can be ditched too, as it won't be useful to backpaginate any
226            // further.
227            new_token = None;
228        }
229
230        // `/messages` has been called with `dir=b` (backwards), so the events are in
231        // the inverted order; reorder them.
232        let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
233
234        let new_gap = new_token.map(|prev_token| Gap { prev_token });
235        let reached_start = state.room_linked_chunk_mut().push_backwards_pagination_events(
236            prev_gap_id,
237            new_gap,
238            &topo_ordered_events,
239        );
240
241        // Note: this flushes updates to the store.
242        state
243            .post_process_new_events(topo_ordered_events, PostProcessingOrigin::Backpagination)
244            .await?;
245
246        let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
247
248        if !timeline_event_diffs.is_empty() {
249            self.update_sender.send(
250                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
251                    diffs: timeline_event_diffs,
252                    origin: EventsOrigin::Pagination,
253                }),
254                Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
255            );
256        }
257
258        Ok(Some(BackPaginationOutcome { events, reached_start }))
259    }
260}