matrix_sdk_ui/timeline/pagination.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{pin_mut, StreamExt as _};
19use matrix_sdk::event_cache::{self, EventCacheError, RoomPaginationStatus};
20use tracing::{instrument, warn};
21
22use super::Error;
23
24impl super::Timeline {
25 /// Add more events to the start of the timeline.
26 ///
27 /// Returns whether we hit the start of the timeline.
28 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
29 pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
30 if self.controller.is_live().await {
31 match self.controller.live_lazy_paginate_backwards(num_events).await {
32 Some(needed_num_events) => {
33 num_events = needed_num_events.try_into().expect(
34 "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
35 );
36 }
37 None => {
38 // We could adjust the skip count to a lower value, while passing the requested
39 // number of events. We *may* have reached the start of the timeline, but since
40 // we're fulfilling the caller's request, assume it's not the case and return
41 // false here. A subsequent call will go to the `Some()` arm of this match, and
42 // cause a call to the event cache's pagination.
43 return Ok(false);
44 }
45 }
46
47 Ok(self.live_paginate_backwards(num_events).await?)
48 } else {
49 Ok(self.controller.focused_paginate_backwards(num_events).await?)
50 }
51 }
52
53 /// Add more events to the end of the timeline.
54 ///
55 /// Returns whether we hit the end of the timeline.
56 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
57 pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
58 if self.controller.is_live().await {
59 Ok(true)
60 } else {
61 Ok(self.controller.focused_paginate_forwards(num_events).await?)
62 }
63 }
64
65 /// Paginate backwards in live mode.
66 ///
67 /// This can only be called when the timeline is in live mode, not focused
68 /// on a specific event.
69 ///
70 /// Returns whether we hit the start of the timeline.
71 async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
72 loop {
73 match self.event_cache.pagination().run_backwards_once(batch_size).await {
74 Ok(outcome) => {
75 // As an exceptional contract, restart the back-pagination if we received an
76 // empty chunk.
77 if outcome.reached_start || !outcome.events.is_empty() {
78 if outcome.reached_start {
79 self.controller.insert_timeline_start_if_missing().await;
80 }
81 return Ok(outcome.reached_start);
82 }
83 }
84
85 Err(EventCacheError::AlreadyBackpaginating) => {
86 // Treat an already running pagination exceptionally, returning false so that
87 // the caller retries later.
88 warn!("Another pagination request is already happening, returning early");
89 return Ok(false);
90 }
91
92 // Propagate other errors as such.
93 Err(err) => return Err(err),
94 }
95 }
96 }
97
98 /// Subscribe to the back-pagination status of a live timeline.
99 ///
100 /// This will return `None` if the timeline is in the focused mode.
101 ///
102 /// Note: this may send multiple Paginating/Idle sequences during a single
103 /// call to [`Self::paginate_backwards()`].
104 pub async fn live_back_pagination_status(
105 &self,
106 ) -> Option<(RoomPaginationStatus, impl Stream<Item = RoomPaginationStatus>)> {
107 if !self.controller.is_live().await {
108 return None;
109 }
110
111 let pagination = self.event_cache.pagination();
112
113 let mut status = pagination.status();
114
115 let current_value = status.next_now();
116
117 let controller = self.controller.clone();
118 let stream = Box::pin(stream! {
119 let status_stream = status.dedup();
120
121 pin_mut!(status_stream);
122
123 while let Some(state) = status_stream.next().await {
124 match state {
125 RoomPaginationStatus::Idle { hit_timeline_start } => {
126 if hit_timeline_start {
127 controller.insert_timeline_start_if_missing().await;
128 }
129 }
130 RoomPaginationStatus::Paginating => {}
131 }
132
133 yield state;
134 }
135 });
136
137 Some((current_value, stream))
138 }
139}