matrix_sdk_ui/timeline/
pagination.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{pin_mut, StreamExt as _};
19use matrix_sdk::event_cache::{self, EventCacheError, RoomPaginationStatus};
20use tracing::{instrument, warn};
21
22use super::Error;
23
24impl super::Timeline {
25    /// Add more events to the start of the timeline.
26    ///
27    /// Returns whether we hit the start of the timeline.
28    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
29    pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
30        if self.controller.is_live().await {
31            match self.controller.live_lazy_paginate_backwards(num_events).await {
32                Some(needed_num_events) => {
33                    num_events = needed_num_events.try_into().expect(
34                        "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
35                    );
36                }
37                None => {
38                    // We could adjust the skip count to a lower value, while passing the requested
39                    // number of events. We *may* have reached the start of the timeline, but since
40                    // we're fulfilling the caller's request, assume it's not the case and return
41                    // false here. A subsequent call will go to the `Some()` arm of this match, and
42                    // cause a call to the event cache's pagination.
43                    return Ok(false);
44                }
45            }
46
47            Ok(self.live_paginate_backwards(num_events).await?)
48        } else {
49            Ok(self.controller.focused_paginate_backwards(num_events).await?)
50        }
51    }
52
53    /// Add more events to the end of the timeline.
54    ///
55    /// Returns whether we hit the end of the timeline.
56    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
57    pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
58        if self.controller.is_live().await {
59            Ok(true)
60        } else {
61            Ok(self.controller.focused_paginate_forwards(num_events).await?)
62        }
63    }
64
65    /// Paginate backwards in live mode.
66    ///
67    /// This can only be called when the timeline is in live mode, not focused
68    /// on a specific event.
69    ///
70    /// Returns whether we hit the start of the timeline.
71    async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
72        loop {
73            match self.event_cache.pagination().run_backwards_once(batch_size).await {
74                Ok(outcome) => {
75                    // As an exceptional contract, restart the back-pagination if we received an
76                    // empty chunk.
77                    if outcome.reached_start || !outcome.events.is_empty() {
78                        if outcome.reached_start {
79                            self.controller.insert_timeline_start_if_missing().await;
80                        }
81                        return Ok(outcome.reached_start);
82                    }
83                }
84
85                Err(EventCacheError::AlreadyBackpaginating) => {
86                    // Treat an already running pagination exceptionally, returning false so that
87                    // the caller retries later.
88                    warn!("Another pagination request is already happening, returning early");
89                    return Ok(false);
90                }
91
92                // Propagate other errors as such.
93                Err(err) => return Err(err),
94            }
95        }
96    }
97
98    /// Subscribe to the back-pagination status of a live timeline.
99    ///
100    /// This will return `None` if the timeline is in the focused mode.
101    ///
102    /// Note: this may send multiple Paginating/Idle sequences during a single
103    /// call to [`Self::paginate_backwards()`].
104    pub async fn live_back_pagination_status(
105        &self,
106    ) -> Option<(RoomPaginationStatus, impl Stream<Item = RoomPaginationStatus>)> {
107        if !self.controller.is_live().await {
108            return None;
109        }
110
111        let pagination = self.event_cache.pagination();
112
113        let mut status = pagination.status();
114
115        let current_value = status.next_now();
116
117        let controller = self.controller.clone();
118        let stream = Box::pin(stream! {
119            let status_stream = status.dedup();
120
121            pin_mut!(status_stream);
122
123            while let Some(state) = status_stream.next().await {
124                match state {
125                    RoomPaginationStatus::Idle { hit_timeline_start } => {
126                        if hit_timeline_start {
127                            controller.insert_timeline_start_if_missing().await;
128                        }
129                    }
130                    RoomPaginationStatus::Paginating => {}
131                }
132
133                yield state;
134            }
135        });
136
137        Some((current_value, stream))
138    }
139}