matrix_sdk_ui/timeline/
pagination.rs1use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{StreamExt as _, pin_mut};
19use matrix_sdk::event_cache::{PaginationStatus, RoomPagination};
20use tracing::instrument;
21
22use super::Error;
23use crate::timeline::{PaginationError::NotSupported, controller::TimelineFocusKind};
24
25impl super::Timeline {
26 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
30 pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
31 match self.controller.focus() {
32 TimelineFocusKind::Live { event_cache, .. } => {
33 match self.controller.live_lazy_paginate_backwards(num_events).await {
34 Some(needed_num_events) => {
35 num_events = needed_num_events.try_into().expect(
36 "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
37 );
38 }
39 None => {
40 return Ok(false);
47 }
48 }
49
50 Ok(self.live_paginate_backwards(&event_cache.pagination(), num_events).await?)
51 }
52
53 TimelineFocusKind::Event { event_cache, .. } => {
54 Ok(event_cache.paginate_backwards(num_events).await?.hit_end_of_timeline)
55 }
56
57 TimelineFocusKind::Thread { event_cache, .. } => Ok(event_cache
58 .pagination()
59 .run_backwards_once(num_events)
60 .await
61 .map(|outcome| outcome.reached_start)?),
62
63 TimelineFocusKind::PinnedEvents { .. } => Err(Error::PaginationError(NotSupported)),
64 }
65 }
66
67 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
71 pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
72 match self.controller.focus() {
73 TimelineFocusKind::Live { .. } => Ok(true),
74
75 TimelineFocusKind::Event { event_cache, .. } => {
76 Ok(event_cache.paginate_forwards(num_events).await?.hit_end_of_timeline)
77 }
78
79 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => {
80 Err(Error::PaginationError(NotSupported))
81 }
82 }
83 }
84
85 async fn live_paginate_backwards(
92 &self,
93 event_cache_pagination: &RoomPagination,
94 batch_size: u16,
95 ) -> Result<bool, Error> {
96 loop {
97 match event_cache_pagination.run_backwards_once(batch_size).await {
98 Ok(outcome) => {
99 if outcome.reached_start {
100 self.controller.insert_timeline_start_if_missing().await;
101 return Ok(true);
102 }
103
104 if !outcome.events.is_empty() {
105 return Ok(false);
106 }
107
108 }
111
112 Err(err) => return Err(err.into()),
114 }
115 }
116 }
117
118 pub async fn live_back_pagination_status(
125 &self,
126 ) -> Option<(PaginationStatus, impl Stream<Item = PaginationStatus> + use<>)> {
127 let TimelineFocusKind::Live { event_cache, .. } = self.controller.focus() else {
128 return None;
129 };
130
131 let pagination = event_cache.pagination();
132
133 let mut status = pagination.status();
134
135 let current_value = self.controller.map_pagination_status(status.next_now()).await;
136
137 let controller = self.controller.clone();
138 let stream = Box::pin(stream! {
139 let status_stream = status.dedup();
140
141 pin_mut!(status_stream);
142
143 while let Some(state) = status_stream.next().await {
144 let state = controller.map_pagination_status(state).await;
145
146 match state {
147 PaginationStatus::Idle { hit_timeline_start } => {
148 if hit_timeline_start {
149 controller.insert_timeline_start_if_missing().await;
150 }
151 }
152 PaginationStatus::Paginating => {}
153 }
154
155 yield state;
156 }
157 });
158
159 Some((current_value, stream))
160 }
161}