Skip to main content

matrix_sdk_ui/timeline/
pagination.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{StreamExt as _, pin_mut};
19use matrix_sdk::event_cache::PaginationStatus;
20use tracing::instrument;
21
22use super::Error;
23use crate::timeline::{
24    PaginationError::{self, NotSupported},
25    controller::TimelineFocusKind,
26};
27
28impl super::Timeline {
29    /// Add more events to the start of the timeline.
30    ///
31    /// Returns whether we hit the start of the timeline.
32    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
33    pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
34        match self.controller.focus() {
35            TimelineFocusKind::Live { .. } => {
36                match self.controller.live_lazy_paginate_backwards(num_events).await {
37                    Some(needed_num_events) => {
38                        num_events = needed_num_events.try_into().expect(
39                            "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
40                        );
41                    }
42                    None => {
43                        // We could adjust the skip count to a lower value, while passing the
44                        // requested number of events. We *may* have reached the start of the
45                        // timeline, but since we're fulfilling the caller's request, assume it's
46                        // not the case and return false here. A subsequent call will go to the
47                        // `Some()` arm of this match, and cause a call to the event cache's
48                        // pagination.
49                        return Ok(false);
50                    }
51                }
52
53                Ok(self.live_paginate_backwards(num_events).await?)
54            }
55
56            TimelineFocusKind::Event { focused_event_id, thread_mode, .. } => Ok(self
57                .event_cache
58                .get_event_focused_cache(focused_event_id.clone(), (*thread_mode).into())
59                .await?
60                .ok_or(PaginationError::MissingCache)?
61                .paginate_backwards(num_events)
62                .await?
63                .hit_end_of_timeline),
64
65            TimelineFocusKind::Thread { root_event_id } => Ok(self
66                .event_cache
67                .thread_pagination(root_event_id.to_owned())
68                .await
69                .map_err(PaginationError::EventCache)?
70                .run_backwards_once(num_events)
71                .await
72                .map(|outcome| outcome.reached_start)?),
73
74            TimelineFocusKind::PinnedEvents => Err(Error::PaginationError(NotSupported)),
75        }
76    }
77
78    /// Add more events to the end of the timeline.
79    ///
80    /// Returns whether we hit the end of the timeline.
81    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
82    pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
83        match self.controller.focus() {
84            TimelineFocusKind::Live { .. } => Ok(true),
85
86            TimelineFocusKind::Event { focused_event_id, thread_mode, .. } => Ok(self
87                .event_cache
88                .get_event_focused_cache(focused_event_id.clone(), (*thread_mode).into())
89                .await?
90                .ok_or(PaginationError::MissingCache)?
91                .paginate_forwards(num_events)
92                .await?
93                .hit_end_of_timeline),
94
95            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => {
96                Err(Error::PaginationError(NotSupported))
97            }
98        }
99    }
100
101    /// Paginate backwards in live mode.
102    ///
103    /// This can only be called when the timeline is in live mode, not focused
104    /// on a specific event.
105    ///
106    /// Returns whether we hit the start of the timeline.
107    async fn live_paginate_backwards(&self, batch_size: u16) -> Result<bool, Error> {
108        loop {
109            match self.event_cache.pagination().run_backwards_once(batch_size).await {
110                Ok(outcome) => {
111                    if outcome.reached_start {
112                        self.controller.insert_timeline_start_if_missing().await;
113                        return Ok(true);
114                    }
115
116                    if !outcome.events.is_empty() {
117                        return Ok(false);
118                    }
119
120                    // Fallthrough: as a special contract, restart pagination,
121                    // if it returned 0 events.
122                }
123
124                // Propagate errors as such.
125                Err(err) => return Err(err.into()),
126            }
127        }
128    }
129
130    /// Subscribe to the back-pagination status of a live timeline.
131    ///
132    /// This will return `None` if the timeline is in the focused mode.
133    ///
134    /// Note: this may send multiple Paginating/Idle sequences during a single
135    /// call to [`Self::paginate_backwards()`].
136    pub async fn live_back_pagination_status(
137        &self,
138    ) -> Option<(PaginationStatus, impl Stream<Item = PaginationStatus> + use<>)> {
139        if !self.controller.is_live() {
140            return None;
141        }
142
143        let pagination = self.event_cache.pagination();
144
145        let mut status = pagination.status();
146
147        let current_value = self.controller.map_pagination_status(status.next_now()).await;
148
149        let controller = self.controller.clone();
150        let stream = Box::pin(stream! {
151            let status_stream = status.dedup();
152
153            pin_mut!(status_stream);
154
155            while let Some(state) = status_stream.next().await {
156                let state = controller.map_pagination_status(state).await;
157
158                match state {
159                    PaginationStatus::Idle { hit_timeline_start } => {
160                        if hit_timeline_start {
161                            controller.insert_timeline_start_if_missing().await;
162                        }
163                    }
164                    PaginationStatus::Paginating => {}
165                }
166
167                yield state;
168            }
169        });
170
171        Some((current_value, stream))
172    }
173}