matrix_sdk/event_cache/caches/room/
pagination.rs1use std::{
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::VectorDiff;
28use futures_core::{Stream, ready};
29use matrix_sdk_base::{
30 event_cache::{Event, Gap},
31 linked_chunk::{ChunkContent, LinkedChunkId, Update},
32};
33use pin_project_lite::pin_project;
34use ruma::api::Direction;
35use tracing::{error, trace};
36
37pub use super::super::pagination::PaginationStatus;
38use super::{
39 super::{
40 super::{
41 EventCacheError, EventsOrigin, Result, RoomEventCacheGenericUpdate,
42 deduplicator::{DeduplicationOutcome, filter_duplicate_events},
43 },
44 TimelineVectorDiffs,
45 pagination::{
46 BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
47 },
48 },
49 PostProcessingOrigin, RoomEventCacheInner, RoomEventCacheUpdate,
50};
51use crate::{event_cache::caches::pagination::SharedPaginationStatus, room::MessagesOptions};
52
53pin_project! {
54 pub struct PaginationStatusSubscriber {
59 #[pin]
60 subscriber: Subscriber<SharedPaginationStatus>,
61 }
62}
63
64#[cfg(not(tarpaulin_include))]
65impl std::fmt::Debug for PaginationStatusSubscriber {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct("PaginationStatusSubscriber").finish_non_exhaustive()
68 }
69}
70
71impl PaginationStatusSubscriber {
72 fn map(from: SharedPaginationStatus) -> PaginationStatus {
73 match from {
74 SharedPaginationStatus::Idle { hit_timeline_start } => {
75 PaginationStatus::Idle { hit_timeline_start }
76 }
77 SharedPaginationStatus::Paginating { .. } => PaginationStatus::Paginating,
78 }
79 }
80
81 pub fn get(&self) -> PaginationStatus {
82 Self::map(self.subscriber.get())
83 }
84
85 pub async fn next(&mut self) -> Option<PaginationStatus> {
86 self.subscriber.next().await.map(Self::map)
87 }
88
89 pub fn next_now(&mut self) -> PaginationStatus {
90 Self::map(self.subscriber.next_now())
91 }
92}
93
94impl Stream for PaginationStatusSubscriber {
95 type Item = PaginationStatus;
96
97 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
98 Poll::Ready(ready!(self.project().subscriber.as_mut().poll_next(cx)).map(Self::map))
99 }
100}
101
102#[allow(missing_debug_implementations)]
109#[derive(Clone)]
110pub struct RoomPagination(Pagination<Arc<RoomEventCacheInner>>);
111
112impl RoomPagination {
113 pub(super) fn new(cache: Arc<RoomEventCacheInner>) -> Self {
115 Self(Pagination::new(cache))
116 }
117
118 pub async fn run_backwards_until(
129 &self,
130 num_requested_events: u16,
131 ) -> Result<BackPaginationOutcome> {
132 self.0.run_backwards_until(num_requested_events).await
133 }
134
135 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
140 self.0.run_backwards_once(batch_size).await
141 }
142
143 pub fn status(&self) -> PaginationStatusSubscriber {
145 PaginationStatusSubscriber { subscriber: self.0.cache.status().subscribe() }
146 }
147
148 #[cfg(test)]
149 pub(super) async fn load_more_events_backwards(
150 &self,
151 ) -> Result<LoadMoreEventsBackwardsOutcome> {
152 self.0.cache.load_more_events_backwards().await
153 }
154}
155
156impl PaginatedCache for Arc<RoomEventCacheInner> {
157 fn status(&self) -> &SharedObservable<SharedPaginationStatus> {
158 &self.shared_pagination_status
159 }
160
161 async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome> {
162 let mut state = self.state.write().await?;
163
164 if let Some(prev_token) = state.room_linked_chunk().rgap().map(|gap| gap.token) {
167 return Ok(LoadMoreEventsBackwardsOutcome::Gap {
168 prev_token: Some(prev_token),
169 waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
170 });
171 }
172
173 let prev_first_chunk =
174 state.room_linked_chunk().chunks().next().expect("a linked chunk is never empty");
175
176 let linked_chunk_id = LinkedChunkId::Room(&state.state.room_id);
178 let new_first_chunk = match state
179 .store
180 .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
181 .await
182 {
183 Ok(Some(new_first_chunk)) => {
184 new_first_chunk
186 }
187
188 Ok(None) => {
189 if state.room_linked_chunk().events().next().is_some() {
194 trace!("chunk is fully loaded and non-empty: reached_start=true");
197 return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
198 }
199
200 return Ok(LoadMoreEventsBackwardsOutcome::Gap {
202 prev_token: None,
203 waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
204 });
205 }
206
207 Err(err) => {
208 error!("error when loading the previous chunk of a linked chunk: {err}");
209
210 state
212 .store
213 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
214 .await?;
215
216 return Err(err.into());
218 }
219 };
220
221 let chunk_content = new_first_chunk.content.clone();
222
223 let reached_start = new_first_chunk.previous.is_none();
229
230 if let Err(err) = state.room_linked_chunk_mut().insert_new_chunk_as_first(new_first_chunk) {
231 error!("error when inserting the previous chunk into its linked chunk: {err}");
232
233 state
235 .store
236 .handle_linked_chunk_updates(
237 LinkedChunkId::Room(&state.state.room_id),
238 vec![Update::Clear],
239 )
240 .await?;
241
242 return Err(err.into());
244 }
245
246 let _ = state.room_linked_chunk_mut().store_updates().take();
249
250 let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
252
253 Ok(match chunk_content {
254 ChunkContent::Gap(gap) => {
255 trace!("reloaded chunk from disk (gap)");
256
257 LoadMoreEventsBackwardsOutcome::Gap {
258 prev_token: Some(gap.token),
259 waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
260 }
261 }
262
263 ChunkContent::Items(events) => {
264 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
265
266 LoadMoreEventsBackwardsOutcome::Events {
267 events,
268 timeline_event_diffs,
269 reached_start,
270 }
271 }
272 })
273 }
274
275 async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()> {
276 *self.state.write().await?.waited_for_initial_prev_token_mut() = true;
277
278 Ok(())
279 }
280
281 async fn wait_for_prev_token(&self) {
282 self.pagination_batch_token_notifier.notified().await
283 }
284
285 async fn paginate_backwards_with_network(
286 &self,
287 batch_size: u16,
288 prev_token: &Option<String>,
289 ) -> Result<Option<(Vec<Event>, Option<String>)>> {
290 let Some(room) = self.weak_room.get() else {
291 return Ok(None);
293 };
294
295 let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
296 options.limit = batch_size.into();
297
298 let response = room
299 .messages(options)
300 .await
301 .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
302
303 Ok(Some((response.chunk, response.end)))
304 }
305
306 async fn conclude_backwards_pagination_from_disk(
307 &self,
308 events: Vec<Event>,
309 timeline_event_diffs: Vec<VectorDiff<Event>>,
310 reached_start: bool,
311 ) -> BackPaginationOutcome {
312 if !timeline_event_diffs.is_empty() {
313 self.update_sender.send(
314 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
315 diffs: timeline_event_diffs,
316 origin: EventsOrigin::Cache,
317 }),
318 Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
319 );
320 }
321
322 BackPaginationOutcome {
323 reached_start,
324 events: events.into_iter().rev().collect(),
327 }
328 }
329
330 async fn conclude_backwards_pagination_from_network(
331 &self,
332 events: Vec<Event>,
333 prev_token: Option<String>,
334 mut new_token: Option<String>,
335 ) -> Result<Option<BackPaginationOutcome>> {
336 let mut state = self.state.write().await?;
337
338 let prev_gap_id = if let Some(token) = prev_token {
341 let gap_chunk_id = state.room_linked_chunk().chunk_identifier(|chunk| {
343 matches!(chunk.content(), ChunkContent::Gap(Gap { token: prev_token }) if *prev_token == token)
344 });
345
346 if gap_chunk_id.is_none() {
347 return Ok(None);
352 }
353
354 gap_chunk_id
355 } else {
356 None
357 };
358
359 let DeduplicationOutcome {
360 all_events: mut events,
361 in_memory_duplicated_event_ids,
362 in_store_duplicated_event_ids,
363 non_empty_all_duplicates: all_duplicates,
364 } = {
365 let room_linked_chunk = state.room_linked_chunk();
366
367 filter_duplicate_events(
368 &state.state.own_user_id,
369 &state.store,
370 LinkedChunkId::Room(&state.state.room_id),
371 room_linked_chunk,
372 events,
373 )
374 .await?
375 };
376
377 if !all_duplicates {
391 state
393 .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
394 .await?;
395 } else {
396 events.clear();
398 new_token = None;
401 }
402
403 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
406
407 let new_gap = new_token.map(|prev_token| Gap { token: prev_token });
408 let reached_start = state.room_linked_chunk_mut().push_backwards_pagination_events(
409 prev_gap_id,
410 new_gap,
411 &topo_ordered_events,
412 );
413
414 let receipt_event = None;
422
423 state
425 .post_process_new_events(
426 topo_ordered_events,
427 PostProcessingOrigin::Backpagination,
428 receipt_event,
429 )
430 .await?;
431
432 let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
433
434 if !timeline_event_diffs.is_empty() {
435 self.update_sender.send(
436 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
437 diffs: timeline_event_diffs,
438 origin: EventsOrigin::Pagination,
439 }),
440 Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
441 );
442 }
443
444 Ok(Some(BackPaginationOutcome { events, reached_start }))
445 }
446}