matrix_sdk_ui/timeline/pagination.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{StreamExt as _, pin_mut};
19use matrix_sdk::event_cache::{self, EventCacheError, RoomPaginationStatus};
20use tracing::{instrument, warn};
21
22use super::Error;
23use crate::timeline::{PaginationError::NotSupported, controller::TimelineFocusKind};
24
25impl super::Timeline {
26 /// Add more events to the start of the timeline.
27 ///
28 /// Returns whether we hit the start of the timeline.
29 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
30 pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
31 match self.controller.focus() {
32 TimelineFocusKind::Live { .. } => {
33 match self.controller.live_lazy_paginate_backwards(num_events).await {
34 Some(needed_num_events) => {
35 num_events = needed_num_events.try_into().expect(
36 "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
37 );
38 }
39 None => {
40 // We could adjust the skip count to a lower value, while passing the
41 // requested number of events. We *may* have reached
42 // the start of the timeline, but since
43 // we're fulfilling the caller's request, assume it's not the case and
44 // return false here. A subsequent call will go to
45 // the `Some()` arm of this match, and cause a call
46 // to the event cache's pagination.
47 return Ok(false);
48 }
49 }
50
51 Ok(self.live_paginate_backwards(num_events).await?)
52 }
53 TimelineFocusKind::Event { .. } => {
54 Ok(self.controller.focused_paginate_backwards(num_events).await?)
55 }
56 TimelineFocusKind::Thread { root_event_id } => Ok(self
57 .event_cache
58 .paginate_thread_backwards(root_event_id.to_owned(), num_events)
59 .await?),
60 TimelineFocusKind::PinnedEvents { .. } => Err(Error::PaginationError(NotSupported)),
61 }
62 }
63
64 /// Add more events to the end of the timeline.
65 ///
66 /// Returns whether we hit the end of the timeline.
67 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
68 pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
69 if self.controller.is_live() {
70 Ok(true)
71 } else {
72 Ok(self.controller.focused_paginate_forwards(num_events).await?)
73 }
74 }
75
76 /// Paginate backwards in live mode.
77 ///
78 /// This can only be called when the timeline is in live mode, not focused
79 /// on a specific event.
80 ///
81 /// Returns whether we hit the start of the timeline.
82 async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
83 loop {
84 match self.event_cache.pagination().run_backwards_once(batch_size).await {
85 Ok(outcome) => {
86 // As an exceptional contract, restart the back-pagination if we received an
87 // empty chunk.
88 if outcome.reached_start || !outcome.events.is_empty() {
89 if outcome.reached_start {
90 self.controller.insert_timeline_start_if_missing().await;
91 }
92 return Ok(outcome.reached_start);
93 }
94 }
95
96 Err(EventCacheError::AlreadyBackpaginating) => {
97 // Treat an already running pagination exceptionally, returning false so that
98 // the caller retries later.
99 warn!("Another pagination request is already happening, returning early");
100 return Ok(false);
101 }
102
103 // Propagate other errors as such.
104 Err(err) => return Err(err),
105 }
106 }
107 }
108
109 /// Subscribe to the back-pagination status of a live timeline.
110 ///
111 /// This will return `None` if the timeline is in the focused mode.
112 ///
113 /// Note: this may send multiple Paginating/Idle sequences during a single
114 /// call to [`Self::paginate_backwards()`].
115 pub async fn live_back_pagination_status(
116 &self,
117 ) -> Option<(RoomPaginationStatus, impl Stream<Item = RoomPaginationStatus> + use<>)> {
118 if !self.controller.is_live() {
119 return None;
120 }
121
122 let pagination = self.event_cache.pagination();
123
124 let mut status = pagination.status();
125
126 let current_value = self.controller.map_pagination_status(status.next_now()).await;
127
128 let controller = self.controller.clone();
129 let stream = Box::pin(stream! {
130 let status_stream = status.dedup();
131
132 pin_mut!(status_stream);
133
134 while let Some(state) = status_stream.next().await {
135 let state = controller.map_pagination_status(state).await;
136
137 match state {
138 RoomPaginationStatus::Idle { hit_timeline_start } => {
139 if hit_timeline_start {
140 controller.insert_timeline_start_if_missing().await;
141 }
142 }
143 RoomPaginationStatus::Paginating => {}
144 }
145
146 yield state;
147 }
148 });
149
150 Some((current_value, stream))
151 }
152}