Skip to main content

matrix_sdk/event_cache/caches/room/
pagination.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The [`RoomPagination`] type makes it possible to paginate a
16//! [`RoomEventCache`].
17//!
18//! [`RoomEventCache`]: super::super::super::RoomEventCache
19
20use std::{
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::VectorDiff;
28use futures_core::{Stream, ready};
29use matrix_sdk_base::{
30    event_cache::{Event, Gap},
31    linked_chunk::{ChunkContent, LinkedChunkId, Update},
32};
33use pin_project_lite::pin_project;
34use ruma::api::Direction;
35use tracing::{error, trace};
36
37pub use super::super::pagination::PaginationStatus;
38use super::{
39    super::{
40        super::{
41            EventCacheError, EventsOrigin, Result, RoomEventCacheGenericUpdate,
42            deduplicator::{DeduplicationOutcome, filter_duplicate_events},
43        },
44        TimelineVectorDiffs,
45        pagination::{
46            BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
47        },
48    },
49    PostProcessingOrigin, RoomEventCacheInner, RoomEventCacheUpdate,
50};
51use crate::{event_cache::caches::pagination::SharedPaginationStatus, room::MessagesOptions};
52
53pin_project! {
54    /// A subscriber to a [`PaginationStatus`].
55    ///
56    /// This is a manual implementation of a map function on top of an internal type
57    /// representing a [`PaginationStatus`].
58    pub struct PaginationStatusSubscriber {
59        #[pin]
60        subscriber: Subscriber<SharedPaginationStatus>,
61    }
62}
63
64#[cfg(not(tarpaulin_include))]
65impl std::fmt::Debug for PaginationStatusSubscriber {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("PaginationStatusSubscriber").finish_non_exhaustive()
68    }
69}
70
71impl PaginationStatusSubscriber {
72    fn map(from: SharedPaginationStatus) -> PaginationStatus {
73        match from {
74            SharedPaginationStatus::Idle { hit_timeline_start } => {
75                PaginationStatus::Idle { hit_timeline_start }
76            }
77            SharedPaginationStatus::Paginating { .. } => PaginationStatus::Paginating,
78        }
79    }
80
81    pub fn get(&self) -> PaginationStatus {
82        Self::map(self.subscriber.get())
83    }
84
85    pub async fn next(&mut self) -> Option<PaginationStatus> {
86        self.subscriber.next().await.map(Self::map)
87    }
88
89    pub fn next_now(&mut self) -> PaginationStatus {
90        Self::map(self.subscriber.next_now())
91    }
92}
93
94impl Stream for PaginationStatusSubscriber {
95    type Item = PaginationStatus;
96
97    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
98        Poll::Ready(ready!(self.project().subscriber.as_mut().poll_next(cx)).map(Self::map))
99    }
100}
101
102/// An API object to run pagination queries on a [`RoomEventCache`].
103///
104/// Can be created with [`RoomEventCache::pagination()`].
105///
106/// [`RoomEventCache`]: super::super::super::RoomEventCache
107/// [`RoomEventCache::pagination()`]: super::super::super::RoomEventCache::pagination
108#[allow(missing_debug_implementations)]
109#[derive(Clone)]
110pub struct RoomPagination(Pagination<Arc<RoomEventCacheInner>>);
111
112impl RoomPagination {
113    /// Construct a new [`RoomPagination`].
114    pub(super) fn new(cache: Arc<RoomEventCacheInner>) -> Self {
115        Self(Pagination::new(cache))
116    }
117
118    /// Starts a back-pagination for the requested number of events.
119    ///
120    /// This automatically takes care of waiting for a pagination token from
121    /// sync, if we haven't done that before.
122    ///
123    /// It will run multiple back-paginations until one of these two conditions
124    /// is met:
125    /// - either we've reached the start of the timeline,
126    /// - or we've obtained enough events to fulfill the requested number of
127    ///   events.
128    pub async fn run_backwards_until(
129        &self,
130        num_requested_events: u16,
131    ) -> Result<BackPaginationOutcome> {
132        self.0.run_backwards_until(num_requested_events).await
133    }
134
135    /// Run a single back-pagination for the requested number of events.
136    ///
137    /// This automatically takes care of waiting for a pagination token from
138    /// sync, if we haven't done that before.
139    pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
140        self.0.run_backwards_once(batch_size).await
141    }
142
143    /// Returns a subscriber to the pagination status.
144    pub fn status(&self) -> PaginationStatusSubscriber {
145        PaginationStatusSubscriber { subscriber: self.0.cache.status().subscribe() }
146    }
147
148    #[cfg(test)]
149    pub(super) async fn load_more_events_backwards(
150        &self,
151    ) -> Result<LoadMoreEventsBackwardsOutcome> {
152        self.0.cache.load_more_events_backwards().await
153    }
154}
155
156impl PaginatedCache for Arc<RoomEventCacheInner> {
157    fn status(&self) -> &SharedObservable<SharedPaginationStatus> {
158        &self.shared_pagination_status
159    }
160
161    async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome> {
162        let mut state = self.state.write().await?;
163
164        // If any in-memory chunk is a gap, don't load more events, and let the caller
165        // resolve the gap.
166        if let Some(prev_token) = state.room_linked_chunk().rgap().map(|gap| gap.token) {
167            return Ok(LoadMoreEventsBackwardsOutcome::Gap {
168                prev_token: Some(prev_token),
169                waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
170            });
171        }
172
173        let prev_first_chunk =
174            state.room_linked_chunk().chunks().next().expect("a linked chunk is never empty");
175
176        // The first chunk is not a gap, we can load its previous chunk.
177        let linked_chunk_id = LinkedChunkId::Room(&state.state.room_id);
178        let new_first_chunk = match state
179            .store
180            .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
181            .await
182        {
183            Ok(Some(new_first_chunk)) => {
184                // All good, let's continue with this chunk.
185                new_first_chunk
186            }
187
188            Ok(None) => {
189                // If we never received events for this room, this means we've never received a
190                // sync for that room, because every room must have *at least* a room creation
191                // event. Otherwise, we have reached the start of the timeline.
192
193                if state.room_linked_chunk().events().next().is_some() {
194                    // If there's at least one event, this means we've reached the start of the
195                    // timeline, since the chunk is fully loaded.
196                    trace!("chunk is fully loaded and non-empty: reached_start=true");
197                    return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
198                }
199
200                // Otherwise, start back-pagination from the end of the room.
201                return Ok(LoadMoreEventsBackwardsOutcome::Gap {
202                    prev_token: None,
203                    waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
204                });
205            }
206
207            Err(err) => {
208                error!("error when loading the previous chunk of a linked chunk: {err}");
209
210                // Clear storage for this room.
211                state
212                    .store
213                    .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
214                    .await?;
215
216                // Return the error.
217                return Err(err.into());
218            }
219        };
220
221        let chunk_content = new_first_chunk.content.clone();
222
223        // We've reached the start on disk, if and only if, there was no chunk prior to
224        // the one we just loaded.
225        //
226        // This value is correct, if and only if, it is used for a chunk content of kind
227        // `Items`.
228        let reached_start = new_first_chunk.previous.is_none();
229
230        if let Err(err) = state.room_linked_chunk_mut().insert_new_chunk_as_first(new_first_chunk) {
231            error!("error when inserting the previous chunk into its linked chunk: {err}");
232
233            // Clear storage for this room.
234            state
235                .store
236                .handle_linked_chunk_updates(
237                    LinkedChunkId::Room(&state.state.room_id),
238                    vec![Update::Clear],
239                )
240                .await?;
241
242            // Return the error.
243            return Err(err.into());
244        }
245
246        // ⚠️ Let's not propagate the updates to the store! We already have these data
247        // in the store! Let's drain them.
248        let _ = state.room_linked_chunk_mut().store_updates().take();
249
250        // However, we want to get updates as `VectorDiff`s.
251        let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
252
253        Ok(match chunk_content {
254            ChunkContent::Gap(gap) => {
255                trace!("reloaded chunk from disk (gap)");
256
257                LoadMoreEventsBackwardsOutcome::Gap {
258                    prev_token: Some(gap.token),
259                    waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
260                }
261            }
262
263            ChunkContent::Items(events) => {
264                trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
265
266                LoadMoreEventsBackwardsOutcome::Events {
267                    events,
268                    timeline_event_diffs,
269                    reached_start,
270                }
271            }
272        })
273    }
274
275    async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()> {
276        *self.state.write().await?.waited_for_initial_prev_token_mut() = true;
277
278        Ok(())
279    }
280
281    async fn wait_for_prev_token(&self) {
282        self.pagination_batch_token_notifier.notified().await
283    }
284
285    async fn paginate_backwards_with_network(
286        &self,
287        batch_size: u16,
288        prev_token: &Option<String>,
289    ) -> Result<Option<(Vec<Event>, Option<String>)>> {
290        let Some(room) = self.weak_room.get() else {
291            // The client is shutting down.
292            return Ok(None);
293        };
294
295        let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
296        options.limit = batch_size.into();
297
298        let response = room
299            .messages(options)
300            .await
301            .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
302
303        Ok(Some((response.chunk, response.end)))
304    }
305
306    async fn conclude_backwards_pagination_from_disk(
307        &self,
308        events: Vec<Event>,
309        timeline_event_diffs: Vec<VectorDiff<Event>>,
310        reached_start: bool,
311    ) -> BackPaginationOutcome {
312        if !timeline_event_diffs.is_empty() {
313            self.update_sender.send(
314                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
315                    diffs: timeline_event_diffs,
316                    origin: EventsOrigin::Cache,
317                }),
318                Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
319            );
320        }
321
322        BackPaginationOutcome {
323            reached_start,
324            // This is a backwards pagination. `BackPaginationOutcome` expects events to
325            // be in “reverse order”.
326            events: events.into_iter().rev().collect(),
327        }
328    }
329
330    async fn conclude_backwards_pagination_from_network(
331        &self,
332        events: Vec<Event>,
333        prev_token: Option<String>,
334        mut new_token: Option<String>,
335    ) -> Result<Option<BackPaginationOutcome>> {
336        let mut state = self.state.write().await?;
337
338        // Check that the previous token still exists; otherwise it's a sign that the
339        // room's timeline has been cleared.
340        let prev_gap_id = if let Some(token) = prev_token {
341            // Find the corresponding gap in the in-memory linked chunk.
342            let gap_chunk_id = state.room_linked_chunk().chunk_identifier(|chunk| {
343                    matches!(chunk.content(), ChunkContent::Gap(Gap { token: prev_token }) if *prev_token == token)
344                });
345
346            if gap_chunk_id.is_none() {
347                // We got a previous-batch token from the linked chunk *before* running the
348                // request, but it is missing *after* completing the request.
349                //
350                // It may be a sign the linked chunk has been reset, but it's fine!
351                return Ok(None);
352            }
353
354            gap_chunk_id
355        } else {
356            None
357        };
358
359        let DeduplicationOutcome {
360            all_events: mut events,
361            in_memory_duplicated_event_ids,
362            in_store_duplicated_event_ids,
363            non_empty_all_duplicates: all_duplicates,
364        } = {
365            let room_linked_chunk = state.room_linked_chunk();
366
367            filter_duplicate_events(
368                &state.state.own_user_id,
369                &state.store,
370                LinkedChunkId::Room(&state.state.room_id),
371                room_linked_chunk,
372                events,
373            )
374            .await?
375        };
376
377        // If not all the events have been back-paginated, we need to remove the
378        // previous ones, otherwise we can end up with misordered events.
379        //
380        // Consider the following scenario:
381        // - sync returns [D, E, F]
382        // - then sync returns [] with a previous batch token PB1, so the internal
383        //   linked chunk state is [D, E, F, PB1].
384        // - back-paginating with PB1 may return [A, B, C, D, E, F].
385        //
386        // Only inserting the new events when replacing PB1 would result in a timeline
387        // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
388        // all the events, in case this happens (see also #4746).
389
390        if !all_duplicates {
391            // Let's forget all the previous events.
392            state
393                .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
394                .await?;
395        } else {
396            // All new events are duplicated, they can all be ignored.
397            events.clear();
398            // The gap can be ditched too, as it won't be useful to backpaginate any
399            // further.
400            new_token = None;
401        }
402
403        // `/messages` has been called with `dir=b` (backwards), so the events are in
404        // the inverted order; reorder them.
405        let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
406
407        let new_gap = new_token.map(|prev_token| Gap { token: prev_token });
408        let reached_start = state.room_linked_chunk_mut().push_backwards_pagination_events(
409            prev_gap_id,
410            new_gap,
411            &topo_ordered_events,
412        );
413
414        // A back-pagination can't include new read receipt events, as those are
415        // ephemeral events not included in /messages responses, so we can
416        // safely set the receipt event to None here.
417        //
418        // Note: read receipts may be updated anyhow in the post-processing step, as the
419        // back-pagination may have revealed the event pointed to by the latest read
420        // receipt.
421        let receipt_event = None;
422
423        // Note: this flushes updates to the store.
424        state
425            .post_process_new_events(
426                topo_ordered_events,
427                PostProcessingOrigin::Backpagination,
428                receipt_event,
429            )
430            .await?;
431
432        let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
433
434        if !timeline_event_diffs.is_empty() {
435            self.update_sender.send(
436                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
437                    diffs: timeline_event_diffs,
438                    origin: EventsOrigin::Pagination,
439                }),
440                Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
441            );
442        }
443
444        Ok(Some(BackPaginationOutcome { events, reached_start }))
445    }
446}