Skip to main content

matrix_sdk_ui/timeline/
pagination.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{StreamExt as _, pin_mut};
19use matrix_sdk::event_cache::{self, EventCacheError, PaginationStatus};
20use tracing::{instrument, warn};
21
22use super::Error;
23use crate::timeline::{PaginationError::NotSupported, controller::TimelineFocusKind};
24
25impl super::Timeline {
26    /// Add more events to the start of the timeline.
27    ///
28    /// Returns whether we hit the start of the timeline.
29    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
30    pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
31        match self.controller.focus() {
32            TimelineFocusKind::Live { .. } => {
33                match self.controller.live_lazy_paginate_backwards(num_events).await {
34                    Some(needed_num_events) => {
35                        num_events = needed_num_events.try_into().expect(
36                            "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
37                        );
38                    }
39                    None => {
40                        // We could adjust the skip count to a lower value, while passing the
41                        // requested number of events. We *may* have reached
42                        // the start of the timeline, but since
43                        // we're fulfilling the caller's request, assume it's not the case and
44                        // return false here. A subsequent call will go to
45                        // the `Some()` arm of this match, and cause a call
46                        // to the event cache's pagination.
47                        return Ok(false);
48                    }
49                }
50
51                Ok(self.live_paginate_backwards(num_events).await?)
52            }
53            TimelineFocusKind::Event { .. } => {
54                Ok(self.controller.focused_paginate_backwards(num_events).await?)
55            }
56            TimelineFocusKind::Thread { root_event_id } => Ok(self
57                .event_cache
58                .thread_pagination(root_event_id.to_owned())
59                .run_backwards_once(num_events)
60                .await
61                .map(|outcome| outcome.reached_start)?),
62            TimelineFocusKind::PinnedEvents => Err(Error::PaginationError(NotSupported)),
63        }
64    }
65
66    /// Add more events to the end of the timeline.
67    ///
68    /// Returns whether we hit the end of the timeline.
69    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
70    pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
71        if self.controller.is_live() {
72            Ok(true)
73        } else {
74            Ok(self.controller.focused_paginate_forwards(num_events).await?)
75        }
76    }
77
78    /// Paginate backwards in live mode.
79    ///
80    /// This can only be called when the timeline is in live mode, not focused
81    /// on a specific event.
82    ///
83    /// Returns whether we hit the start of the timeline.
84    async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
85        loop {
86            match self.event_cache.pagination().run_backwards_once(batch_size).await {
87                Ok(outcome) => {
88                    // As an exceptional contract, restart the back-pagination if we received an
89                    // empty chunk.
90                    if outcome.reached_start || !outcome.events.is_empty() {
91                        if outcome.reached_start {
92                            self.controller.insert_timeline_start_if_missing().await;
93                        }
94                        return Ok(outcome.reached_start);
95                    }
96                }
97
98                Err(EventCacheError::AlreadyBackpaginating) => {
99                    // Treat an already running pagination exceptionally, returning false so that
100                    // the caller retries later.
101                    warn!("Another pagination request is already happening, returning early");
102                    return Ok(false);
103                }
104
105                // Propagate other errors as such.
106                Err(err) => return Err(err),
107            }
108        }
109    }
110
111    /// Subscribe to the back-pagination status of a live timeline.
112    ///
113    /// This will return `None` if the timeline is in the focused mode.
114    ///
115    /// Note: this may send multiple Paginating/Idle sequences during a single
116    /// call to [`Self::paginate_backwards()`].
117    pub async fn live_back_pagination_status(
118        &self,
119    ) -> Option<(PaginationStatus, impl Stream<Item = PaginationStatus> + use<>)> {
120        if !self.controller.is_live() {
121            return None;
122        }
123
124        let pagination = self.event_cache.pagination();
125
126        let mut status = pagination.status();
127
128        let current_value = self.controller.map_pagination_status(status.next_now()).await;
129
130        let controller = self.controller.clone();
131        let stream = Box::pin(stream! {
132            let status_stream = status.dedup();
133
134            pin_mut!(status_stream);
135
136            while let Some(state) = status_stream.next().await {
137                let state = controller.map_pagination_status(state).await;
138
139                match state {
140                    PaginationStatus::Idle { hit_timeline_start } => {
141                        if hit_timeline_start {
142                            controller.insert_timeline_start_if_missing().await;
143                        }
144                    }
145                    PaginationStatus::Paginating => {}
146                }
147
148                yield state;
149            }
150        });
151
152        Some((current_value, stream))
153    }
154}