matrix_sdk/event_cache/caches/room/
pagination.rs1use std::sync::Arc;
21
22use eyeball::{SharedObservable, Subscriber};
23use eyeball_im::VectorDiff;
24use matrix_sdk_base::{
25 event_cache::{Event, Gap},
26 linked_chunk::{ChunkContent, LinkedChunkId},
27};
28use ruma::api::Direction;
29
30pub use super::super::pagination::PaginationStatus;
31use super::{
32 super::{
33 super::{
34 EventCacheError, EventsOrigin, Result, RoomEventCacheGenericUpdate,
35 deduplicator::{DeduplicationOutcome, filter_duplicate_events},
36 },
37 TimelineVectorDiffs,
38 pagination::{
39 BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
40 },
41 },
42 PostProcessingOrigin, RoomEventCacheInner, RoomEventCacheUpdate,
43};
44use crate::room::MessagesOptions;
45
46#[allow(missing_debug_implementations)]
53#[derive(Clone)]
54pub struct RoomPagination(Pagination<Arc<RoomEventCacheInner>>);
55
56impl RoomPagination {
57 pub(in super::super::super) fn new(cache: Arc<RoomEventCacheInner>) -> Self {
59 Self(Pagination::new(cache))
60 }
61
62 pub async fn run_backwards_until(
73 &self,
74 num_requested_events: u16,
75 ) -> Result<BackPaginationOutcome> {
76 self.0.run_backwards_until(num_requested_events).await
77 }
78
79 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
84 self.0.run_backwards_once(batch_size).await
85 }
86
87 pub fn status(&self) -> Subscriber<PaginationStatus> {
89 self.0.cache.status().subscribe()
90 }
91}
92
93impl PaginatedCache for Arc<RoomEventCacheInner> {
94 fn status(&self) -> &SharedObservable<PaginationStatus> {
95 &self.pagination_status
96 }
97
98 async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome> {
99 self.state.write().await?.load_more_events_backwards().await
100 }
101
102 async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()> {
103 *self.state.write().await?.waited_for_initial_prev_token() = true;
104
105 Ok(())
106 }
107
108 async fn wait_for_prev_token(&self) {
109 self.pagination_batch_token_notifier.notified().await
110 }
111
112 async fn paginate_backwards_with_network(
113 &self,
114 batch_size: u16,
115 prev_token: &Option<String>,
116 ) -> Result<Option<(Vec<Event>, Option<String>)>> {
117 let Some(room) = self.weak_room.get() else {
118 return Ok(None);
120 };
121
122 let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
123 options.limit = batch_size.into();
124
125 let response = room
126 .messages(options)
127 .await
128 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
129
130 Ok(Some((response.chunk, response.end)))
131 }
132
133 async fn conclude_backwards_pagination_from_disk(
134 &self,
135 events: Vec<Event>,
136 timeline_event_diffs: Vec<VectorDiff<Event>>,
137 reached_start: bool,
138 ) -> BackPaginationOutcome {
139 if !timeline_event_diffs.is_empty() {
140 self.update_sender.send(
141 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
142 diffs: timeline_event_diffs,
143 origin: EventsOrigin::Cache,
144 }),
145 Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
146 );
147 }
148
149 BackPaginationOutcome {
150 reached_start,
151 events: events.into_iter().rev().collect(),
154 }
155 }
156
157 async fn conclude_backwards_pagination_from_network(
158 &self,
159 events: Vec<Event>,
160 prev_token: Option<String>,
161 mut new_token: Option<String>,
162 ) -> Result<Option<BackPaginationOutcome>> {
163 let mut state = self.state.write().await?;
164
165 let prev_gap_id = if let Some(token) = prev_token {
168 let gap_chunk_id = state.room_linked_chunk().chunk_identifier(|chunk| {
170 matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
171 });
172
173 if gap_chunk_id.is_none() {
174 return Ok(None);
179 }
180
181 gap_chunk_id
182 } else {
183 None
184 };
185
186 let DeduplicationOutcome {
187 all_events: mut events,
188 in_memory_duplicated_event_ids,
189 in_store_duplicated_event_ids,
190 non_empty_all_duplicates: all_duplicates,
191 } = {
192 let room_linked_chunk = state.room_linked_chunk();
193
194 filter_duplicate_events(
195 &state.state.own_user_id,
196 &state.store,
197 LinkedChunkId::Room(&state.state.room_id),
198 room_linked_chunk,
199 events,
200 )
201 .await?
202 };
203
204 if !all_duplicates {
218 state
220 .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
221 .await?;
222 } else {
223 events.clear();
225 new_token = None;
228 }
229
230 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
233
234 let new_gap = new_token.map(|prev_token| Gap { prev_token });
235 let reached_start = state.room_linked_chunk_mut().push_backwards_pagination_events(
236 prev_gap_id,
237 new_gap,
238 &topo_ordered_events,
239 );
240
241 state
243 .post_process_new_events(topo_ordered_events, PostProcessingOrigin::Backpagination)
244 .await?;
245
246 let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
247
248 if !timeline_event_diffs.is_empty() {
249 self.update_sender.send(
250 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
251 diffs: timeline_event_diffs,
252 origin: EventsOrigin::Pagination,
253 }),
254 Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
255 );
256 }
257
258 Ok(Some(BackPaginationOutcome { events, reached_start }))
259 }
260}