matrix_sdk/event_cache/
pagination.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{
18    sync::{Arc, atomic::Ordering},
19    time::Duration,
20};
21
22use eyeball::{SharedObservable, Subscriber};
23use matrix_sdk_base::timeout::timeout;
24use ruma::api::Direction;
25use tracing::{debug, instrument, trace};
26
27use super::{
28    BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
29    room::{LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
30};
31use crate::{
32    event_cache::{EventCacheError, RoomEventCacheGenericUpdate},
33    room::MessagesOptions,
34};
35
36/// Status for the back-pagination on a room event cache.
37#[derive(Debug, PartialEq, Clone, Copy)]
38#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
39pub enum RoomPaginationStatus {
40    /// No back-pagination is happening right now.
41    Idle {
42        /// Have we hit the start of the timeline, i.e. back-paginating wouldn't
43        /// have any effect?
44        hit_timeline_start: bool,
45    },
46
47    /// Back-pagination is already running in the background.
48    Paginating,
49}
50
51/// Small RAII guard to reset the pagination status on drop, if not disarmed in
52/// the meanwhile.
53struct ResetStatusOnDrop {
54    prev_status: Option<RoomPaginationStatus>,
55    pagination_status: SharedObservable<RoomPaginationStatus>,
56}
57
58impl ResetStatusOnDrop {
59    /// Make the RAII guard have no effect.
60    fn disarm(mut self) {
61        self.prev_status = None;
62    }
63}
64
65impl Drop for ResetStatusOnDrop {
66    fn drop(&mut self) {
67        if let Some(status) = self.prev_status.take() {
68            let _ = self.pagination_status.set(status);
69        }
70    }
71}
72
73/// An API object to run pagination queries on a [`super::RoomEventCache`].
74///
75/// Can be created with [`super::RoomEventCache::pagination()`].
76#[allow(missing_debug_implementations)]
77#[derive(Clone)]
78pub struct RoomPagination {
79    pub(super) inner: Arc<RoomEventCacheInner>,
80}
81
82impl RoomPagination {
83    /// Starts a back-pagination for the requested number of events.
84    ///
85    /// This automatically takes care of waiting for a pagination token from
86    /// sync, if we haven't done that before.
87    ///
88    /// It will run multiple back-paginations until one of these two conditions
89    /// is met:
90    /// - either we've reached the start of the timeline,
91    /// - or we've obtained enough events to fulfill the requested number of
92    ///   events.
93    #[instrument(skip(self))]
94    pub async fn run_backwards_until(
95        &self,
96        num_requested_events: u16,
97    ) -> Result<BackPaginationOutcome> {
98        let mut events = Vec::new();
99
100        loop {
101            if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
102                events.extend(outcome.events);
103                if outcome.reached_start || events.len() >= num_requested_events as usize {
104                    return Ok(BackPaginationOutcome {
105                        reached_start: outcome.reached_start,
106                        events,
107                    });
108                }
109                trace!(
110                    "restarting back-pagination, because we haven't reached \
111                     the start or obtained enough events yet"
112                );
113            }
114
115            debug!("restarting back-pagination because of a timeline reset.");
116        }
117    }
118
119    /// Run a single back-pagination for the requested number of events.
120    ///
121    /// This automatically takes care of waiting for a pagination token from
122    /// sync, if we haven't done that before.
123    #[instrument(skip(self))]
124    pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
125        loop {
126            if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
127                return Ok(outcome);
128            }
129            debug!("restarting back-pagination because of a timeline reset.");
130        }
131    }
132
133    /// Paginate from either the storage or the network, and let pagination
134    /// status observers know about updates.
135    ///
136    /// Returns `Ok(None)` if the pagination token used during a network
137    /// pagination has disappeared from the in-memory linked chunk after
138    /// handling the response.
139    async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
140        // There is at least one gap that must be resolved; reach the network.
141        // First, ensure there's no other ongoing back-pagination.
142        let status_observable = &self.inner.pagination_status;
143
144        let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
145        if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
146            return Err(EventCacheError::AlreadyBackpaginating);
147        }
148
149        let reset_status_on_drop_guard = ResetStatusOnDrop {
150            prev_status: Some(prev_status),
151            pagination_status: status_observable.clone(),
152        };
153
154        match self.paginate_backwards_impl(batch_size).await? {
155            Some(outcome) => {
156                // Back-pagination's over and successful, don't reset the status to the previous
157                // value.
158                reset_status_on_drop_guard.disarm();
159
160                // Notify subscribers that pagination ended.
161                status_observable
162                    .set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
163
164                Ok(Some(outcome))
165            }
166
167            None => Ok(None),
168        }
169    }
170
171    /// Paginate from either the storage or the network.
172    ///
173    /// This method isn't concerned with setting the pagination status; only the
174    /// caller is.
175    ///
176    /// Returns `Ok(None)` if the pagination token used during a network
177    /// pagination has disappeared from the in-memory linked chunk after
178    /// handling the response.
179    async fn paginate_backwards_impl(
180        &self,
181        batch_size: u16,
182    ) -> Result<Option<BackPaginationOutcome>> {
183        // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
184        // to load from storage first, then from network if storage indicated
185        // there's no previous events chunk to load.
186
187        loop {
188            let mut state_guard = self.inner.state.write().await?;
189
190            match state_guard.load_more_events_backwards().await? {
191                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
192                    if prev_token.is_none()
193                        && !state_guard.waited_for_initial_prev_token().load(Ordering::SeqCst)
194                    {
195                        // We didn't reload a pagination token, and we haven't waited for one; wait
196                        // and start over.
197
198                        const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
199
200                        // Release the state guard while waiting, to not deadlock the sync task.
201                        drop(state_guard);
202
203                        // Otherwise, wait for a notification that we received a previous-batch
204                        // token.
205                        trace!("waiting for a pagination token…");
206                        let _ = timeout(
207                            self.inner.pagination_batch_token_notifier.notified(),
208                            DEFAULT_WAIT_FOR_TOKEN_DURATION,
209                        )
210                        .await;
211                        trace!("done waiting");
212
213                        self.inner
214                            .state
215                            .write()
216                            .await?
217                            .waited_for_initial_prev_token()
218                            .store(true, Ordering::SeqCst);
219
220                        // Retry!
221                        //
222                        // Note: the next call to `load_more_events_backwards` can't return
223                        // `WaitForInitialPrevToken` because we've just set to
224                        // `waited_for_initial_prev_token`, so this is not an infinite loop.
225                        //
226                        // Note 2: not a recursive call, because recursive and async have a bad time
227                        // together.
228                        continue;
229                    }
230
231                    // We have a gap, so resolve it with a network back-pagination.
232                    drop(state_guard);
233                    return self.paginate_backwards_with_network(batch_size, prev_token).await;
234                }
235
236                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
237                    return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
238                }
239
240                LoadMoreEventsBackwardsOutcome::Events {
241                    events,
242                    timeline_event_diffs,
243                    reached_start,
244                } => {
245                    if !timeline_event_diffs.is_empty() {
246                        let _ = self.inner.update_sender.send(
247                            RoomEventCacheUpdate::UpdateTimelineEvents {
248                                diffs: timeline_event_diffs,
249                                origin: EventsOrigin::Cache,
250                            },
251                        );
252
253                        // Send a room event cache generic update.
254                        let _ =
255                            self.inner.generic_update_sender.send(RoomEventCacheGenericUpdate {
256                                room_id: self.inner.room_id.clone(),
257                            });
258                    }
259
260                    return Ok(Some(BackPaginationOutcome {
261                        reached_start,
262                        // This is a backwards pagination. `BackPaginationOutcome` expects events to
263                        // be in “reverse order”.
264                        events: events.into_iter().rev().collect(),
265                    }));
266                }
267            }
268        }
269    }
270
271    /// Run a single pagination request (/messages) to the server.
272    ///
273    /// If there are no previous-batch tokens, it will wait for one for a short
274    /// while to get one, or if it's already done so or if it's seen a
275    /// previous-batch token before, it will immediately indicate it's
276    /// reached the end of the timeline.
277    ///
278    /// Returns `Ok(None)` if the pagination token used during the request has
279    /// disappeared from the in-memory linked chunk after handling the
280    /// response.
281    async fn paginate_backwards_with_network(
282        &self,
283        batch_size: u16,
284        prev_token: Option<String>,
285    ) -> Result<Option<BackPaginationOutcome>> {
286        let (events, new_token) = {
287            let Some(room) = self.inner.weak_room.get() else {
288                // The client is shutting down, return an empty default response.
289                return Ok(Some(BackPaginationOutcome {
290                    reached_start: false,
291                    events: Default::default(),
292                }));
293            };
294
295            let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
296            options.limit = batch_size.into();
297
298            let response = room
299                .messages(options)
300                .await
301                .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
302
303            (response.chunk, response.end)
304        };
305
306        if let Some((outcome, timeline_event_diffs)) = self
307            .inner
308            .state
309            .write()
310            .await?
311            .handle_backpagination(events, new_token, prev_token)
312            .await?
313        {
314            if !timeline_event_diffs.is_empty() {
315                let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
316                    diffs: timeline_event_diffs,
317                    origin: EventsOrigin::Pagination,
318                });
319            }
320
321            Ok(Some(outcome))
322        } else {
323            // The previous token has gone missing, so the timeline has been reset in the
324            // meanwhile, but it's fine per this function's contract.
325            Ok(None)
326        }
327    }
328
329    /// Returns a subscriber to the pagination status used for the
330    /// back-pagination integrated to the event cache.
331    pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
332        self.inner.pagination_status.subscribe()
333    }
334}