matrix_sdk/event_cache/
pagination.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{sync::Arc, time::Duration};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk_base::timeout::timeout;
21use ruma::api::Direction;
22use tracing::{debug, instrument, trace};
23
24use super::{
25    room::{LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
26    BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
27};
28use crate::{
29    event_cache::{EventCacheError, RoomEventCacheGenericUpdate},
30    room::MessagesOptions,
31};
32
33/// Status for the back-pagination on a room event cache.
34#[derive(Debug, PartialEq, Clone, Copy)]
35#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
36pub enum RoomPaginationStatus {
37    /// No back-pagination is happening right now.
38    Idle {
39        /// Have we hit the start of the timeline, i.e. back-paginating wouldn't
40        /// have any effect?
41        hit_timeline_start: bool,
42    },
43
44    /// Back-pagination is already running in the background.
45    Paginating,
46}
47
48/// Small RAII guard to reset the pagination status on drop, if not disarmed in
49/// the meanwhile.
50struct ResetStatusOnDrop {
51    prev_status: Option<RoomPaginationStatus>,
52    pagination_status: SharedObservable<RoomPaginationStatus>,
53}
54
55impl ResetStatusOnDrop {
56    /// Make the RAII guard have no effect.
57    fn disarm(mut self) {
58        self.prev_status = None;
59    }
60}
61
62impl Drop for ResetStatusOnDrop {
63    fn drop(&mut self) {
64        if let Some(status) = self.prev_status.take() {
65            let _ = self.pagination_status.set(status);
66        }
67    }
68}
69
70/// An API object to run pagination queries on a [`super::RoomEventCache`].
71///
72/// Can be created with [`super::RoomEventCache::pagination()`].
73#[allow(missing_debug_implementations)]
74#[derive(Clone)]
75pub struct RoomPagination {
76    pub(super) inner: Arc<RoomEventCacheInner>,
77}
78
79impl RoomPagination {
80    /// Starts a back-pagination for the requested number of events.
81    ///
82    /// This automatically takes care of waiting for a pagination token from
83    /// sync, if we haven't done that before.
84    ///
85    /// It will run multiple back-paginations until one of these two conditions
86    /// is met:
87    /// - either we've reached the start of the timeline,
88    /// - or we've obtained enough events to fulfill the requested number of
89    ///   events.
90    #[instrument(skip(self))]
91    pub async fn run_backwards_until(
92        &self,
93        num_requested_events: u16,
94    ) -> Result<BackPaginationOutcome> {
95        let mut events = Vec::new();
96
97        loop {
98            if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
99                events.extend(outcome.events);
100                if outcome.reached_start || events.len() >= num_requested_events as usize {
101                    return Ok(BackPaginationOutcome {
102                        reached_start: outcome.reached_start,
103                        events,
104                    });
105                }
106                trace!(
107                    "restarting back-pagination, because we haven't reached \
108                     the start or obtained enough events yet"
109                );
110            }
111
112            debug!("restarting back-pagination because of a timeline reset.");
113        }
114    }
115
116    /// Run a single back-pagination for the requested number of events.
117    ///
118    /// This automatically takes care of waiting for a pagination token from
119    /// sync, if we haven't done that before.
120    #[instrument(skip(self))]
121    pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
122        loop {
123            if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
124                return Ok(outcome);
125            }
126            debug!("restarting back-pagination because of a timeline reset.");
127        }
128    }
129
130    /// Paginate from either the storage or the network, and let pagination
131    /// status observers know about updates.
132    async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
133        // There is at least one gap that must be resolved; reach the network.
134        // First, ensure there's no other ongoing back-pagination.
135        let status_observable = &self.inner.pagination_status;
136
137        let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
138        if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
139            return Err(EventCacheError::AlreadyBackpaginating);
140        }
141
142        let reset_status_on_drop_guard = ResetStatusOnDrop {
143            prev_status: Some(prev_status),
144            pagination_status: status_observable.clone(),
145        };
146
147        match self.paginate_backwards_impl(batch_size).await? {
148            Some(outcome) => {
149                // Back-pagination's over and successful, don't reset the status to the previous
150                // value.
151                reset_status_on_drop_guard.disarm();
152
153                // Notify subscribers that pagination ended.
154                status_observable
155                    .set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
156
157                Ok(Some(outcome))
158            }
159
160            None => {
161                // We keep the previous status value, because we haven't obtained more
162                // information about the pagination.
163                Ok(None)
164            }
165        }
166    }
167
168    /// Paginate from either the storage or the network.
169    ///
170    /// This method isn't concerned with setting the pagination status; only the
171    /// caller is.
172    async fn paginate_backwards_impl(
173        &self,
174        batch_size: u16,
175    ) -> Result<Option<BackPaginationOutcome>> {
176        // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
177        // to load from storage first, then from network if storage indicated
178        // there's no previous events chunk to load.
179
180        loop {
181            let mut state_guard = self.inner.state.write().await;
182
183            match state_guard.load_more_events_backwards().await? {
184                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
185                    const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
186
187                    // Release the state guard while waiting, to not deadlock the sync task.
188                    drop(state_guard);
189
190                    // Otherwise, wait for a notification that we received a previous-batch token.
191                    trace!("waiting for a pagination token…");
192                    let _ = timeout(
193                        self.inner.pagination_batch_token_notifier.notified(),
194                        DEFAULT_WAIT_FOR_TOKEN_DURATION,
195                    )
196                    .await;
197                    trace!("done waiting");
198
199                    self.inner.state.write().await.waited_for_initial_prev_token = true;
200
201                    // Retry!
202                    //
203                    // Note: the next call to `load_more_events_backwards` can't return
204                    // `WaitForInitialPrevToken` because we've just set to
205                    // `waited_for_initial_prev_token`, so this is not an infinite loop.
206                    //
207                    // Note 2: not a recursive call, because recursive and async have a bad time
208                    // together.
209                    continue;
210                }
211
212                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
213                    // We have a gap, so resolve it with a network back-pagination.
214                    drop(state_guard);
215                    return self.paginate_backwards_with_network(batch_size, prev_token).await;
216                }
217
218                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
219                    return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
220                }
221
222                LoadMoreEventsBackwardsOutcome::Events {
223                    events,
224                    timeline_event_diffs,
225                    reached_start,
226                } => {
227                    if !timeline_event_diffs.is_empty() {
228                        let _ =
229                            self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
230                                diffs: timeline_event_diffs,
231                                origin: EventsOrigin::Cache,
232                            });
233
234                        // Send a room event cache generic update.
235                        let _ =
236                            self.inner.generic_update_sender.send(RoomEventCacheGenericUpdate {
237                                room_id: self.inner.room_id.clone(),
238                            });
239                    }
240
241                    return Ok(Some(BackPaginationOutcome {
242                        reached_start,
243                        // This is a backwards pagination. `BackPaginationOutcome` expects events to
244                        // be in “reverse order”.
245                        events: events.into_iter().rev().collect(),
246                    }));
247                }
248            }
249        }
250    }
251
252    /// Run a single pagination request (/messages) to the server.
253    ///
254    /// If there are no previous-batch tokens, it will wait for one for a short
255    /// while to get one, or if it's already done so or if it's seen a
256    /// previous-batch token before, it will immediately indicate it's
257    /// reached the end of the timeline.
258    async fn paginate_backwards_with_network(
259        &self,
260        batch_size: u16,
261        prev_token: Option<String>,
262    ) -> Result<Option<BackPaginationOutcome>> {
263        let (events, new_token) = {
264            let Some(room) = self.inner.weak_room.get() else {
265                // The client is shutting down, return an empty default response.
266                return Ok(Some(BackPaginationOutcome {
267                    reached_start: false,
268                    events: Default::default(),
269                }));
270            };
271
272            let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
273            options.limit = batch_size.into();
274
275            let response = room
276                .messages(options)
277                .await
278                .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
279
280            (response.chunk, response.end)
281        };
282
283        if let Some((outcome, timeline_event_diffs)) = self
284            .inner
285            .state
286            .write()
287            .await
288            .handle_backpagination(events, new_token, prev_token)
289            .await?
290        {
291            if !timeline_event_diffs.is_empty() {
292                let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
293                    diffs: timeline_event_diffs,
294                    origin: EventsOrigin::Pagination,
295                });
296            }
297
298            Ok(Some(outcome))
299        } else {
300            // The previous token has gone missing, so the timeline has been reset in the
301            // meanwhile, but it's fine per this function's contract.
302            Ok(None)
303        }
304    }
305
306    /// Returns a subscriber to the pagination status used for the
307    /// back-pagination integrated to the event cache.
308    pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
309        self.inner.pagination_status.subscribe()
310    }
311}