matrix_sdk_ui/timeline/
pagination.rs1use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{StreamExt as _, pin_mut};
19use matrix_sdk::event_cache::PaginationStatus;
20use tracing::instrument;
21
22use super::Error;
23use crate::timeline::{
24 PaginationError::{self, NotSupported},
25 controller::TimelineFocusKind,
26};
27
28impl super::Timeline {
29 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
33 pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
34 match self.controller.focus() {
35 TimelineFocusKind::Live { .. } => {
36 match self.controller.live_lazy_paginate_backwards(num_events).await {
37 Some(needed_num_events) => {
38 num_events = needed_num_events.try_into().expect(
39 "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
40 );
41 }
42 None => {
43 return Ok(false);
50 }
51 }
52
53 Ok(self.live_paginate_backwards(num_events).await?)
54 }
55
56 TimelineFocusKind::Event { focused_event_id, thread_mode, .. } => Ok(self
57 .event_cache
58 .get_event_focused_cache(focused_event_id.clone(), (*thread_mode).into())
59 .await?
60 .ok_or(PaginationError::MissingCache)?
61 .paginate_backwards(num_events)
62 .await?
63 .hit_end_of_timeline),
64
65 TimelineFocusKind::Thread { root_event_id } => Ok(self
66 .event_cache
67 .thread_pagination(root_event_id.to_owned())
68 .await
69 .map_err(PaginationError::EventCache)?
70 .run_backwards_once(num_events)
71 .await
72 .map(|outcome| outcome.reached_start)?),
73
74 TimelineFocusKind::PinnedEvents => Err(Error::PaginationError(NotSupported)),
75 }
76 }
77
78 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
82 pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
83 match self.controller.focus() {
84 TimelineFocusKind::Live { .. } => Ok(true),
85
86 TimelineFocusKind::Event { focused_event_id, thread_mode, .. } => Ok(self
87 .event_cache
88 .get_event_focused_cache(focused_event_id.clone(), (*thread_mode).into())
89 .await?
90 .ok_or(PaginationError::MissingCache)?
91 .paginate_forwards(num_events)
92 .await?
93 .hit_end_of_timeline),
94
95 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => {
96 Err(Error::PaginationError(NotSupported))
97 }
98 }
99 }
100
101 async fn live_paginate_backwards(&self, batch_size: u16) -> Result<bool, Error> {
108 loop {
109 match self.event_cache.pagination().run_backwards_once(batch_size).await {
110 Ok(outcome) => {
111 if outcome.reached_start {
112 self.controller.insert_timeline_start_if_missing().await;
113 return Ok(true);
114 }
115
116 if !outcome.events.is_empty() {
117 return Ok(false);
118 }
119
120 }
123
124 Err(err) => return Err(err.into()),
126 }
127 }
128 }
129
130 pub async fn live_back_pagination_status(
137 &self,
138 ) -> Option<(PaginationStatus, impl Stream<Item = PaginationStatus> + use<>)> {
139 if !self.controller.is_live() {
140 return None;
141 }
142
143 let pagination = self.event_cache.pagination();
144
145 let mut status = pagination.status();
146
147 let current_value = self.controller.map_pagination_status(status.next_now()).await;
148
149 let controller = self.controller.clone();
150 let stream = Box::pin(stream! {
151 let status_stream = status.dedup();
152
153 pin_mut!(status_stream);
154
155 while let Some(state) = status_stream.next().await {
156 let state = controller.map_pagination_status(state).await;
157
158 match state {
159 PaginationStatus::Idle { hit_timeline_start } => {
160 if hit_timeline_start {
161 controller.insert_timeline_start_if_missing().await;
162 }
163 }
164 PaginationStatus::Paginating => {}
165 }
166
167 yield state;
168 }
169 });
170
171 Some((current_value, stream))
172 }
173}