Skip to main content

matrix_sdk_ui/timeline/
pagination.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{StreamExt as _, pin_mut};
19use matrix_sdk::event_cache::{PaginationStatus, RoomPagination};
20use tracing::instrument;
21
22use super::Error;
23use crate::timeline::{PaginationError::NotSupported, controller::TimelineFocusKind};
24
25impl super::Timeline {
26    /// Add more events to the start of the timeline.
27    ///
28    /// Returns whether we hit the start of the timeline.
29    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
30    pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
31        match self.controller.focus() {
32            TimelineFocusKind::Live { event_cache, .. } => {
33                match self.controller.live_lazy_paginate_backwards(num_events).await {
34                    Some(needed_num_events) => {
35                        num_events = needed_num_events.try_into().expect(
36                            "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
37                        );
38                    }
39                    None => {
40                        // We could adjust the skip count to a lower value, while passing the
41                        // requested number of events. We *may* have reached the start of the
42                        // timeline, but since we're fulfilling the caller's request, assume it's
43                        // not the case and return false here. A subsequent call will go to the
44                        // `Some()` arm of this match, and cause a call to the event cache's
45                        // pagination.
46                        return Ok(false);
47                    }
48                }
49
50                Ok(self.live_paginate_backwards(&event_cache.pagination(), num_events).await?)
51            }
52
53            TimelineFocusKind::Event { event_cache, .. } => {
54                Ok(event_cache.paginate_backwards(num_events).await?.hit_end_of_timeline)
55            }
56
57            TimelineFocusKind::Thread { event_cache, .. } => Ok(event_cache
58                .pagination()
59                .run_backwards_once(num_events)
60                .await
61                .map(|outcome| outcome.reached_start)?),
62
63            TimelineFocusKind::PinnedEvents { .. } => Err(Error::PaginationError(NotSupported)),
64        }
65    }
66
67    /// Add more events to the end of the timeline.
68    ///
69    /// Returns whether we hit the end of the timeline.
70    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
71    pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
72        match self.controller.focus() {
73            TimelineFocusKind::Live { .. } => Ok(true),
74
75            TimelineFocusKind::Event { event_cache, .. } => {
76                Ok(event_cache.paginate_forwards(num_events).await?.hit_end_of_timeline)
77            }
78
79            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => {
80                Err(Error::PaginationError(NotSupported))
81            }
82        }
83    }
84
85    /// Paginate backwards in live mode.
86    ///
87    /// This can only be called when the timeline is in live mode, not focused
88    /// on a specific event.
89    ///
90    /// Returns whether we hit the start of the timeline.
91    async fn live_paginate_backwards(
92        &self,
93        event_cache_pagination: &RoomPagination,
94        batch_size: u16,
95    ) -> Result<bool, Error> {
96        loop {
97            match event_cache_pagination.run_backwards_once(batch_size).await {
98                Ok(outcome) => {
99                    if outcome.reached_start {
100                        self.controller.insert_timeline_start_if_missing().await;
101                        return Ok(true);
102                    }
103
104                    if !outcome.events.is_empty() {
105                        return Ok(false);
106                    }
107
108                    // Fallthrough: as a special contract, restart pagination,
109                    // if it returned 0 events.
110                }
111
112                // Propagate errors as such.
113                Err(err) => return Err(err.into()),
114            }
115        }
116    }
117
118    /// Subscribe to the back-pagination status of a live timeline.
119    ///
120    /// This will return `None` if the timeline is in the focused mode.
121    ///
122    /// Note: this may send multiple Paginating/Idle sequences during a single
123    /// call to [`Self::paginate_backwards()`].
124    pub async fn live_back_pagination_status(
125        &self,
126    ) -> Option<(PaginationStatus, impl Stream<Item = PaginationStatus> + use<>)> {
127        let TimelineFocusKind::Live { event_cache, .. } = self.controller.focus() else {
128            return None;
129        };
130
131        let pagination = event_cache.pagination();
132
133        let mut status = pagination.status();
134
135        let current_value = self.controller.map_pagination_status(status.next_now()).await;
136
137        let controller = self.controller.clone();
138        let stream = Box::pin(stream! {
139            let status_stream = status.dedup();
140
141            pin_mut!(status_stream);
142
143            while let Some(state) = status_stream.next().await {
144                let state = controller.map_pagination_status(state).await;
145
146                match state {
147                    PaginationStatus::Idle { hit_timeline_start } => {
148                        if hit_timeline_start {
149                            controller.insert_timeline_start_if_missing().await;
150                        }
151                    }
152                    PaginationStatus::Paginating => {}
153                }
154
155                yield state;
156            }
157        });
158
159        Some((current_value, stream))
160    }
161}