1use std::{sync::Arc, time::Duration};
18
19use eyeball::Subscriber;
20use matrix_sdk_base::timeout::timeout;
21use matrix_sdk_common::linked_chunk::ChunkContent;
22use tracing::{debug, instrument, trace};
23
24use super::{
25 paginator::{PaginationResult, PaginatorState},
26 room::{
27 events::{Gap, RoomEvents},
28 LoadMoreEventsBackwardsOutcome, RoomEventCacheInner,
29 },
30 BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
31};
32
33#[allow(missing_debug_implementations)]
37#[derive(Clone)]
38pub struct RoomPagination {
39 pub(super) inner: Arc<RoomEventCacheInner>,
40}
41
42impl RoomPagination {
43 #[instrument(skip(self))]
54 pub async fn run_backwards_until(
55 &self,
56 num_requested_events: u16,
57 ) -> Result<BackPaginationOutcome> {
58 let mut events = Vec::new();
59
60 loop {
61 if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
62 events.extend(outcome.events);
63 if outcome.reached_start || events.len() >= num_requested_events as usize {
64 return Ok(BackPaginationOutcome {
65 reached_start: outcome.reached_start,
66 events,
67 });
68 }
69 trace!("restarting back-pagination, because we haven't reached the start or obtained enough events yet");
70 }
71
72 debug!("restarting back-pagination because of a timeline reset.");
73 }
74 }
75
76 #[instrument(skip(self))]
81 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
82 loop {
83 if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
84 return Ok(outcome);
85 }
86 debug!("restarting back-pagination because of a timeline reset.");
87 }
88 }
89
90 async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
91 const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
92
93 match self.inner.state.write().await.load_more_events_backwards().await? {
102 LoadMoreEventsBackwardsOutcome::Gap => {
103 }
105
106 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
107 return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }))
108 }
109
110 LoadMoreEventsBackwardsOutcome::Events(events, sync_timeline_events_diffs) => {
111 if !sync_timeline_events_diffs.is_empty() {
112 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
113 diffs: sync_timeline_events_diffs,
114 origin: EventsOrigin::Pagination,
115 });
116 }
117
118 return Ok(Some(BackPaginationOutcome {
119 reached_start: false,
120 events: events.into_iter().rev().collect(),
123 }));
124 }
125 }
126
127 let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await;
130
131 let prev_token = match prev_token {
132 PaginationToken::HasMore(token) => Some(token),
133 PaginationToken::None => None,
134 PaginationToken::HitEnd => {
135 debug!("Not back-paginating since we've reached the start of the timeline.");
136 return Ok(Some(BackPaginationOutcome { reached_start: true, events: Vec::new() }));
137 }
138 };
139
140 let paginator = &self.inner.paginator;
141
142 paginator.set_idle_state(PaginatorState::Idle, prev_token.clone(), None)?;
143
144 let PaginationResult { events, hit_end_of_timeline: reached_start } =
146 paginator.paginate_backward(batch_size.into()).await?;
147
148 let mut state = self.inner.state.write().await;
151
152 let prev_gap_id = if let Some(token) = prev_token {
155 let gap_id = state.events().chunk_identifier(|chunk| {
156 matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
157 });
158
159 if gap_id.is_none() {
164 return Ok(None);
165 }
166
167 gap_id
168 } else {
169 None
170 };
171
172 let new_gap = paginator.prev_batch_token().map(|prev_token| Gap { prev_token });
174
175 let (mut events, duplicated_event_ids, all_deduplicated) =
176 state.collect_valid_and_duplicated_events(events).await?;
177
178 if !all_deduplicated {
182 events.retain(|new_event| {
184 new_event
185 .event_id()
186 .map(|event_id| !duplicated_event_ids.contains(&event_id))
187 .unwrap_or(false)
188 });
189 } else {
190 events.clear();
192 }
193
194 let ((), sync_timeline_events_diffs) = state
195 .with_events_mut(|room_events| {
196 let reversed_events = events
200 .iter()
201 .rev()
202 .cloned()
203 .collect::<Vec<_>>();
204
205 let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
206
207 let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id {
209 if all_deduplicated {
211 trace!("removing previous gap, as all events have been deduplicated");
214 room_events.remove_gap_at(gap_id).expect("gap identifier is a valid gap chunk id we read previously")
215 } else {
216 trace!("replacing previous gap with the back-paginated events");
217
218 room_events.replace_gap_at(reversed_events.clone(), gap_id)
220 .expect("gap_identifier is a valid chunk id we read previously")
221 }
222 } else if let Some(pos) = first_event_pos {
223 trace!("inserted events before the first known event");
226
227 room_events
228 .insert_events_at(reversed_events.clone(), pos)
229 .expect("pos is a valid position we just read above");
230
231 Some(pos)
232 } else {
233 trace!("pushing events received from back-pagination");
235
236 room_events.push_events(reversed_events.clone());
237
238 room_events.events().next().map(|(item_pos, _)| item_pos)
240 };
241
242 if !all_deduplicated {
247 if let Some(new_gap) = new_gap {
248 if let Some(new_pos) = insert_new_gap_pos {
249 room_events
250 .insert_gap_at(new_gap, new_pos)
251 .expect("events_chunk_pos represents a valid chunk position");
252 } else {
253 room_events.push_gap(new_gap);
254 }
255 }
256 } else {
257 debug!("not storing previous batch token, because we deduplicated all new back-paginated events");
258 }
259
260 room_events.on_new_events(&self.inner.room_version, reversed_events.iter());
261 })
262 .await?;
263
264 let backpagination_outcome = BackPaginationOutcome { events, reached_start };
265
266 if !sync_timeline_events_diffs.is_empty() {
267 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
268 diffs: sync_timeline_events_diffs,
269 origin: EventsOrigin::Pagination,
270 });
271 }
272
273 Ok(Some(backpagination_outcome))
274 }
275
276 #[doc(hidden)]
282 pub async fn get_or_wait_for_token(&self, wait_time: Option<Duration>) -> PaginationToken {
283 fn get_latest(events: &RoomEvents) -> Option<String> {
284 events.rchunks().find_map(|chunk| match chunk.content() {
285 ChunkContent::Gap(gap) => Some(gap.prev_token.clone()),
286 ChunkContent::Items(..) => None,
287 })
288 }
289
290 {
291 let state = self.inner.state.read().await;
293
294 let has_events = state.events().events().next().is_some();
299
300 if let Some(found) = get_latest(state.events()) {
302 return PaginationToken::HasMore(found);
303 }
304
305 if has_events {
308 return PaginationToken::HitEnd;
309 }
310
311 if state.waited_for_initial_prev_token {
314 return PaginationToken::None;
315 }
316 }
317
318 let Some(wait_time) = wait_time else {
320 return PaginationToken::None;
321 };
322
323 let _ = timeout(self.inner.pagination_batch_token_notifier.notified(), wait_time).await;
327
328 let mut state = self.inner.state.write().await;
329
330 state.waited_for_initial_prev_token = true;
331
332 if let Some(token) = get_latest(state.events()) {
333 PaginationToken::HasMore(token)
334 } else if state.events().events().next().is_some() {
335 PaginationToken::HitEnd
337 } else {
338 PaginationToken::None
339 }
340 }
341
342 pub fn status(&self) -> Subscriber<PaginatorState> {
345 self.inner.paginator.state()
346 }
347
348 pub fn hit_timeline_start(&self) -> bool {
353 self.inner.paginator.hit_timeline_start()
354 }
355
356 pub fn hit_timeline_end(&self) -> bool {
361 self.inner.paginator.hit_timeline_end()
362 }
363}
364
365#[derive(Clone, Debug, PartialEq)]
367pub enum PaginationToken {
368 None,
371 HasMore(String),
374 HitEnd,
377}
378
379impl From<Option<String>> for PaginationToken {
380 fn from(token: Option<String>) -> Self {
381 match token {
382 Some(val) => Self::HasMore(val),
383 None => Self::None,
384 }
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 #[cfg(not(target_arch = "wasm32"))]
392 mod time_tests {
393 use std::time::{Duration, Instant};
394
395 use assert_matches::assert_matches;
396 use matrix_sdk_base::RoomState;
397 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE};
398 use ruma::{event_id, room_id, user_id};
399 use tokio::{spawn, time::sleep};
400
401 use crate::{
402 event_cache::{pagination::PaginationToken, room::events::Gap},
403 test_utils::logged_in_client,
404 };
405
406 #[async_test]
407 async fn test_wait_no_pagination_token() {
408 let client = logged_in_client(None).await;
409 let room_id = room_id!("!galette:saucisse.bzh");
410 client.base_client().get_or_create_room(room_id, RoomState::Joined);
411
412 let event_cache = client.event_cache();
413
414 event_cache.subscribe().unwrap();
415
416 let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
417
418 let pagination = room_event_cache.pagination();
419
420 let found = pagination.get_or_wait_for_token(None).await;
423 assert_matches!(found, PaginationToken::None);
425
426 let _ = pagination.inner.state.write().await.reset().await.unwrap();
428
429 let before = Instant::now();
431 let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
432 let waited = before.elapsed();
433 assert_matches!(found, PaginationToken::None);
435 assert!(waited.as_secs() < 1);
437
438 let _ = pagination.inner.state.write().await.reset().await.unwrap();
440
441 let before = Instant::now();
443 let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
444 let waited = before.elapsed();
445 assert_matches!(found, PaginationToken::None);
447 assert!(waited.as_secs() < 2);
449 assert!(waited.as_secs() >= 1);
450 }
451
452 #[async_test]
453 async fn test_wait_hit_end_of_timeline() {
454 let client = logged_in_client(None).await;
455 let room_id = room_id!("!galette:saucisse.bzh");
456 client.base_client().get_or_create_room(room_id, RoomState::Joined);
457
458 let event_cache = client.event_cache();
459
460 event_cache.subscribe().unwrap();
461
462 let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
463
464 let f = EventFactory::new().room(room_id).sender(*ALICE);
465 let pagination = room_event_cache.pagination();
466
467 room_event_cache
469 .inner
470 .state
471 .write()
472 .await
473 .with_events_mut(|events| {
474 events.push_events([f
475 .text_msg("this is the start of the timeline")
476 .into_event()]);
477 })
478 .await
479 .unwrap();
480
481 let found = pagination.get_or_wait_for_token(None).await;
484 assert_matches!(found, PaginationToken::HitEnd);
486
487 let before = Instant::now();
489 let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
490 let waited = before.elapsed();
491 assert_matches!(found, PaginationToken::HitEnd);
493 assert!(waited.as_secs() < 1);
495
496 let before = Instant::now();
498 let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
499 let waited = before.elapsed();
500 assert_matches!(found, PaginationToken::HitEnd);
502 assert!(waited.as_secs() < 1);
504
505 room_event_cache.clear().await.unwrap();
508
509 spawn(async move {
510 sleep(Duration::from_secs(1)).await;
511
512 room_event_cache
513 .inner
514 .state
515 .write()
516 .await
517 .with_events_mut(|events| {
518 events.push_events([f
519 .text_msg("this is the start of the timeline")
520 .into_event()]);
521 })
522 .await
523 .unwrap();
524 });
525
526 let before = Instant::now();
528 let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(2))).await;
529 let waited = before.elapsed();
530 assert_matches!(found, PaginationToken::HitEnd);
532 assert!(waited.as_secs() >= 2);
534 assert!(waited.as_secs() < 3);
535 }
536
537 #[async_test]
538 async fn test_wait_for_pagination_token_already_present() {
539 let client = logged_in_client(None).await;
540 let room_id = room_id!("!galette:saucisse.bzh");
541 client.base_client().get_or_create_room(room_id, RoomState::Joined);
542
543 let event_cache = client.event_cache();
544
545 event_cache.subscribe().unwrap();
546
547 let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
548
549 let expected_token = "old".to_owned();
550
551 {
553 room_event_cache
554 .inner
555 .state
556 .write()
557 .await
558 .with_events_mut(|room_events| {
559 room_events.push_gap(Gap { prev_token: expected_token.clone() });
560 room_events.push_events([EventFactory::new()
561 .text_msg("yolo")
562 .sender(user_id!("@b:z.h"))
563 .event_id(event_id!("$ida"))
564 .into_event()]);
565 })
566 .await
567 .unwrap();
568 }
569
570 let pagination = room_event_cache.pagination();
571
572 let found = pagination.get_or_wait_for_token(None).await;
574 assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
576
577 let before = Instant::now();
579 let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
580 let waited = before.elapsed();
581 assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
583 assert!(waited.as_millis() < 100);
585
586 let before = Instant::now();
588 let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
589 let waited = before.elapsed();
590 assert_eq!(found, PaginationToken::HasMore(expected_token));
592 assert!(waited.as_millis() < 100);
594 }
595
596 #[async_test]
597 async fn test_wait_for_late_pagination_token() {
598 let client = logged_in_client(None).await;
599 let room_id = room_id!("!galette:saucisse.bzh");
600 client.base_client().get_or_create_room(room_id, RoomState::Joined);
601
602 let event_cache = client.event_cache();
603
604 event_cache.subscribe().unwrap();
605
606 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
607
608 let expected_token = "old".to_owned();
609
610 let before = Instant::now();
611 let cloned_expected_token = expected_token.clone();
612 let cloned_room_event_cache = room_event_cache.clone();
613 let insert_token_task = spawn(async move {
614 sleep(Duration::from_millis(400)).await;
616
617 cloned_room_event_cache
618 .inner
619 .state
620 .write()
621 .await
622 .with_events_mut(|events| {
623 events.push_gap(Gap { prev_token: cloned_expected_token })
624 })
625 .await
626 .unwrap();
627 });
628
629 let pagination = room_event_cache.pagination();
630
631 let found = pagination.get_or_wait_for_token(None).await;
633 assert_matches!(found, PaginationToken::None);
634
635 let found = pagination.get_or_wait_for_token(Some(Duration::from_millis(600))).await;
637 let waited = before.elapsed();
638
639 assert_eq!(found, PaginationToken::HasMore(expected_token));
641 assert!(waited.as_secs() < 1);
643 assert!(waited.as_millis() >= 400);
644
645 insert_token_task.await.unwrap();
647 }
648
649 #[async_test]
650 async fn test_get_latest_token() {
651 let client = logged_in_client(None).await;
652 let room_id = room_id!("!galette:saucisse.bzh");
653 client.base_client().get_or_create_room(room_id, RoomState::Joined);
654
655 let event_cache = client.event_cache();
656
657 event_cache.subscribe().unwrap();
658
659 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
660
661 let old_token = "old".to_owned();
662 let new_token = "new".to_owned();
663
664 room_event_cache
667 .inner
668 .state
669 .write()
670 .await
671 .with_events_mut(|events| {
672 let f = EventFactory::new().room(room_id).sender(*ALICE);
673
674 events.push_gap(Gap { prev_token: old_token });
678 events.push_events([f.text_msg("oldest from cache").into()]);
679
680 events.push_gap(Gap { prev_token: new_token.clone() });
681 events.push_events([f.text_msg("sync'd gappy timeline").into()]);
682 })
683 .await
684 .unwrap();
685
686 let pagination = room_event_cache.pagination();
687
688 let found = pagination.get_or_wait_for_token(None).await;
691 assert_eq!(found, PaginationToken::HasMore(new_token));
692 }
693 }
694}