1pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21 collections::BTreeMap,
22 fmt,
23 sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28 deserialized_responses::AmbiguityChange,
29 event_cache::Event,
30 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
31};
32use ruma::{
33 EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomId,
34 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
35 serde::Raw,
36};
37pub(super) use state::{LockedRoomEventCacheState, RoomEventCacheStateLockWriteGuard};
38pub use subscriber::RoomEventCacheSubscriber;
39use tokio::sync::{Notify, broadcast::Receiver, mpsc};
40use tracing::{instrument, trace, warn};
41pub use updates::{
42 RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
43 RoomEventCacheUpdateSender,
44};
45
46use super::{
47 super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination},
48 TimelineVectorDiffs,
49 event_linked_chunk::sort_positions_descending,
50 thread::pagination::ThreadPagination,
51};
52use crate::{
53 client::WeakClient,
54 event_cache::{
55 EventFocusThreadMode,
56 caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus},
57 },
58 room::WeakRoom,
59};
60
61#[derive(Clone)]
65pub struct RoomEventCache {
66 inner: Arc<RoomEventCacheInner>,
67}
68
69impl fmt::Debug for RoomEventCache {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("RoomEventCache").finish_non_exhaustive()
72 }
73}
74
75impl RoomEventCache {
76 pub(super) fn new(
78 room_id: OwnedRoomId,
79 weak_room: WeakRoom,
80 state: LockedRoomEventCacheState,
81 shared_pagination_status: SharedObservable<SharedPaginationStatus>,
82 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
83 update_sender: RoomEventCacheUpdateSender,
84 ) -> Self {
85 Self {
86 inner: Arc::new(RoomEventCacheInner::new(
87 room_id,
88 weak_room,
89 state,
90 shared_pagination_status,
91 auto_shrink_sender,
92 update_sender,
93 )),
94 }
95 }
96
97 pub fn room_id(&self) -> &RoomId {
99 &self.inner.room_id
100 }
101
102 pub async fn events(&self) -> Result<Vec<Event>> {
107 let state = self.inner.state.read().await?;
108
109 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
110 }
111
112 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
119 let state = self.inner.state.read().await?;
120 let events =
121 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
122
123 let subscriber_count = state.subscriber_count();
124 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
125 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
126
127 let subscriber = RoomEventCacheSubscriber::new(
128 self.inner.update_sender.new_room_receiver(),
129 self.inner.room_id.clone(),
130 self.inner.auto_shrink_sender.clone(),
131 subscriber_count.clone(),
132 );
133
134 Ok((events, subscriber))
135 }
136
137 pub async fn subscribe_to_thread(
140 &self,
141 thread_root: OwnedEventId,
142 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
143 let mut state = self.inner.state.write().await?;
144
145 state.subscribe_to_thread(thread_root).await
146 }
147
148 pub async fn subscribe_to_pinned_events(
158 &self,
159 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
160 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
161 let state = self.inner.state.read().await?;
162
163 state.subscribe_to_pinned_events(room).await
164 }
165
166 #[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))]
180 pub async fn get_or_create_event_focused_cache(
181 &self,
182 event_id: OwnedEventId,
183 num_context_events: u16,
184 thread_mode: EventFocusThreadMode,
185 ) -> Result<EventFocusedCache> {
186 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
187 let guard = self.inner.state.read().await?;
188
189 if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
191 trace!("the cache was already created, returning it");
192 return Ok(cache);
193 }
194
195 let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone();
197
198 drop(guard);
202
203 let room_id = room.room_id().to_owned();
204 let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone());
205
206 trace!("creating a fresh event-focused cache");
207 let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender);
208
209 cache.start_from(room, num_context_events, thread_mode).await?;
211
212 let mut guard = self.inner.state.write().await?;
213
214 if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
217 trace!("another cache has been racily created, returning it");
218 return Ok(cache);
219 }
220
221 guard.insert_event_focused_cache(event_id, thread_mode, cache.clone());
223
224 Ok(cache)
225 }
226
227 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234 pub async fn get_event_focused_cache(
235 &self,
236 event_id: OwnedEventId,
237 thread_mode: EventFocusThreadMode,
238 ) -> Result<Option<EventFocusedCache>> {
239 Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode))
240 }
241
242 pub fn pagination(&self) -> RoomPagination {
245 RoomPagination::new(self.inner.clone())
246 }
247
248 pub async fn thread_pagination(&self, thread_id: OwnedEventId) -> Result<ThreadPagination> {
251 Ok(self.inner.state.write().await?.get_or_reload_thread(thread_id).pagination())
252 }
253
254 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
262 where
263 P: FnMut(&Event) -> Option<O>,
264 {
265 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
266 }
267
268 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
273 Ok(self
274 .inner
275 .state
276 .read()
277 .await?
278 .find_event(event_id)
279 .await
280 .ok()
281 .flatten()
282 .map(|(_loc, event)| event))
283 }
284
285 pub async fn find_event_with_relations(
297 &self,
298 event_id: &EventId,
299 filter: Option<Vec<RelationType>>,
300 ) -> Result<Option<(Event, Vec<Event>)>> {
301 Ok(self
303 .inner
304 .state
305 .read()
306 .await?
307 .find_event_with_relations(event_id, filter.clone())
308 .await
309 .ok()
310 .flatten())
311 }
312
313 pub async fn find_event_relations(
325 &self,
326 event_id: &EventId,
327 filter: Option<Vec<RelationType>>,
328 ) -> Result<Vec<Event>> {
329 self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
331 }
332
333 pub async fn clear(&self) -> Result<()> {
338 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
340
341 self.inner.update_sender.send(
343 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
344 diffs: updates_as_vector_diffs,
345 origin: EventsOrigin::Cache,
346 }),
347 Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
348 );
349
350 Ok(())
351 }
352
353 pub(in super::super) fn state(&self) -> &LockedRoomEventCacheState {
355 &self.inner.state
356 }
357
358 #[instrument(skip_all, fields(room_id = %self.room_id()))]
360 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
361 self.inner
362 .handle_timeline(
363 updates.timeline,
364 updates.ephemeral.clone(),
365 updates.ambiguity_changes,
366 updates.avatar_changes,
367 )
368 .await?;
369 self.inner.handle_account_data(updates.account_data);
370
371 Ok(())
372 }
373
374 #[instrument(skip_all, fields(room_id = %self.room_id()))]
376 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
377 self.inner
378 .handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes, None)
379 .await?;
380
381 Ok(())
382 }
383
384 pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
386 &self.inner.update_sender
387 }
388
389 pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
391 self.inner.insert_sent_event_from_send_queue(event).await
392 }
393
394 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
397 match self.inner.state.write().await {
398 Ok(mut state_guard) => {
399 if let Err(err) = state_guard.save_events(events).await {
400 warn!("couldn't save event in the event cache: {err}");
401 }
402 }
403
404 Err(err) => {
405 warn!("couldn't save event in the event cache: {err}");
406 }
407 }
408 }
409
410 pub async fn debug_string(&self) -> Vec<String> {
413 match self.inner.state.read().await {
414 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
415 Err(err) => {
416 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
417
418 vec![]
419 }
420 }
421 }
422}
423
424pub(super) struct RoomEventCacheInner {
426 room_id: OwnedRoomId,
428
429 pub weak_room: WeakRoom,
430
431 pub state: LockedRoomEventCacheState,
433
434 pub pagination_batch_token_notifier: Notify,
436
437 pub shared_pagination_status: SharedObservable<SharedPaginationStatus>,
438
439 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
444
445 update_sender: RoomEventCacheUpdateSender,
447}
448
449impl RoomEventCacheInner {
450 fn new(
453 room_id: OwnedRoomId,
454 weak_room: WeakRoom,
455 state: LockedRoomEventCacheState,
456 shared_pagination_status: SharedObservable<SharedPaginationStatus>,
457 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
458 update_sender: RoomEventCacheUpdateSender,
459 ) -> Self {
460 Self {
461 room_id,
462 weak_room,
463 state,
464 update_sender,
465 pagination_batch_token_notifier: Default::default(),
466 auto_shrink_sender,
467 shared_pagination_status,
468 }
469 }
470
471 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
472 if account_data.is_empty() {
473 return;
474 }
475
476 let mut handled_read_marker = false;
477
478 trace!("Handling account data");
479
480 for raw_event in account_data {
481 match raw_event.deserialize() {
482 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
483 if handled_read_marker {
486 continue;
487 }
488
489 handled_read_marker = true;
490
491 self.update_sender.send(
493 RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
494 None,
495 );
496 }
497
498 Ok(_) => {
499 }
502
503 Err(e) => {
504 let event_type = raw_event.get_field::<String>("type").ok().flatten();
505 warn!(event_type, "Failed to deserialize account data: {e}");
506 }
507 }
508 }
509 }
510
511 async fn handle_timeline(
514 &self,
515 timeline: Timeline,
516 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
517 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
518 avatar_changes: Option<BTreeMap<OwnedUserId, Option<OwnedMxcUri>>>,
519 ) -> Result<()> {
520 self.handle_timeline_inner(
521 self.state.write().await?,
522 timeline,
523 ephemeral_events,
524 ambiguity_changes,
525 avatar_changes,
526 )
527 .await
528 }
529
530 async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
534 let state = self.state.write().await?;
535
536 if state.room_linked_chunk().events().next().is_some() {
540 return self
541 .handle_timeline_inner(
542 state,
543 Timeline { limited: false, prev_batch: None, events: vec![event] },
544 Vec::new(),
545 BTreeMap::new(),
546 None,
547 )
548 .await;
549 }
550
551 Ok(())
552 }
553
554 async fn handle_timeline_inner(
555 &self,
556 mut state: RoomEventCacheStateLockWriteGuard<'_>,
557 timeline: Timeline,
558 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
559 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
560 avatar_changes: Option<BTreeMap<OwnedUserId, Option<OwnedMxcUri>>>,
561 ) -> Result<()> {
562 if timeline.events.is_empty()
563 && timeline.prev_batch.is_none()
564 && ephemeral_events.is_empty()
565 && ambiguity_changes.is_empty()
566 && avatar_changes.as_ref().is_none_or(|avatars| avatars.is_empty())
567 {
568 return Ok(());
569 }
570
571 trace!("adding new events");
573
574 let (stored_prev_batch_token, timeline_event_diffs) =
575 state.handle_sync(timeline, &ephemeral_events).await?;
576
577 drop(state);
578
579 if stored_prev_batch_token {
582 self.pagination_batch_token_notifier.notify_one();
583 }
584
585 if !timeline_event_diffs.is_empty() {
588 self.update_sender.send(
589 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
590 diffs: timeline_event_diffs,
591 origin: EventsOrigin::Sync,
592 }),
593 Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
594 );
595 }
596
597 if !ephemeral_events.is_empty() {
598 self.update_sender
599 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
600 }
601
602 if !ambiguity_changes.is_empty() || avatar_changes.as_ref().is_some_and(|c| !c.is_empty()) {
603 self.update_sender.send(
604 RoomEventCacheUpdate::UpdateMembers { ambiguity_changes, avatar_changes },
605 None,
606 );
607 }
608
609 Ok(())
610 }
611}
612
613#[derive(Clone, Copy)]
614pub(in super::super) enum PostProcessingOrigin {
615 Sync,
616 Backpagination,
617 #[cfg(feature = "e2e-encryption")]
618 Redecryption,
619}
620
621#[cfg(test)]
622mod tests {
623 use matrix_sdk_base::{RoomState, event_cache::Event};
624 use matrix_sdk_test::{async_test, event_factory::EventFactory};
625 use ruma::{
626 RoomId, event_id,
627 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
628 room_id, user_id,
629 };
630
631 use crate::test_utils::logged_in_client;
632
633 #[async_test]
634 async fn test_find_event_by_id_with_edit_relation() {
635 let original_id = event_id!("$original");
636 let related_id = event_id!("$related");
637 let room_id = room_id!("!galette:saucisse.bzh");
638 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
639
640 assert_relations(
641 room_id,
642 f.text_msg("Original event").event_id(original_id).into(),
643 f.text_msg("* An edited event")
644 .edit(
645 original_id,
646 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
647 )
648 .event_id(related_id)
649 .into(),
650 f,
651 )
652 .await;
653 }
654
655 #[async_test]
656 async fn test_find_event_by_id_with_thread_reply_relation() {
657 let original_id = event_id!("$original");
658 let related_id = event_id!("$related");
659 let room_id = room_id!("!galette:saucisse.bzh");
660 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
661
662 assert_relations(
663 room_id,
664 f.text_msg("Original event").event_id(original_id).into(),
665 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
666 f,
667 )
668 .await;
669 }
670
671 #[async_test]
672 async fn test_find_event_by_id_with_reaction_relation() {
673 let original_id = event_id!("$original");
674 let related_id = event_id!("$related");
675 let room_id = room_id!("!galette:saucisse.bzh");
676 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
677
678 assert_relations(
679 room_id,
680 f.text_msg("Original event").event_id(original_id).into(),
681 f.reaction(original_id, ":D").event_id(related_id).into(),
682 f,
683 )
684 .await;
685 }
686
687 #[async_test]
688 async fn test_find_event_by_id_with_poll_response_relation() {
689 let original_id = event_id!("$original");
690 let related_id = event_id!("$related");
691 let room_id = room_id!("!galette:saucisse.bzh");
692 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
693
694 assert_relations(
695 room_id,
696 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
697 .event_id(original_id)
698 .into(),
699 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
700 f,
701 )
702 .await;
703 }
704
705 #[async_test]
706 async fn test_find_event_by_id_with_poll_end_relation() {
707 let original_id = event_id!("$original");
708 let related_id = event_id!("$related");
709 let room_id = room_id!("!galette:saucisse.bzh");
710 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
711
712 assert_relations(
713 room_id,
714 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
715 .event_id(original_id)
716 .into(),
717 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
718 f,
719 )
720 .await;
721 }
722
723 #[async_test]
724 async fn test_find_event_by_id_with_filtered_relationships() {
725 let original_id = event_id!("$original");
726 let related_id = event_id!("$related");
727 let associated_related_id = event_id!("$recursive_related");
728 let room_id = room_id!("!galette:saucisse.bzh");
729 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
730
731 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
732 let related_event = event_factory
733 .text_msg("* Edited event")
734 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
735 .event_id(related_id)
736 .into();
737 let associated_related_event =
738 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
739
740 let client = logged_in_client(None).await;
741
742 let event_cache = client.event_cache();
743 event_cache.subscribe().unwrap();
744
745 client.base_client().get_or_create_room(room_id, RoomState::Joined);
746 let room = client.get_room(room_id).unwrap();
747
748 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
749
750 room_event_cache.save_events([original_event]).await;
752
753 room_event_cache.save_events([related_event]).await;
755
756 room_event_cache.save_events([associated_related_event]).await;
758
759 let filter = Some(vec![RelationType::Replacement]);
760 let (event, related_events) = room_event_cache
761 .find_event_with_relations(original_id, filter)
762 .await
763 .expect("Failed to find the event with relations")
764 .expect("Event has no relation");
765 let cached_event_id = event.event_id().unwrap();
767 assert_eq!(cached_event_id, original_id);
768
769 assert_eq!(related_events.len(), 1);
771
772 let related_event_id = related_events[0].event_id().unwrap();
773 assert_eq!(related_event_id, related_id);
774
775 let filter = Some(vec![RelationType::Thread]);
777 let (event, related_events) = room_event_cache
778 .find_event_with_relations(original_id, filter)
779 .await
780 .expect("Failed to find the event with relations")
781 .expect("Event has no relation");
782
783 let cached_event_id = event.event_id().unwrap();
785 assert_eq!(cached_event_id, original_id);
786 assert!(related_events.is_empty());
788 }
789
790 #[async_test]
791 async fn test_find_event_by_id_with_recursive_relation() {
792 let original_id = event_id!("$original");
793 let related_id = event_id!("$related");
794 let associated_related_id = event_id!("$recursive_related");
795 let room_id = room_id!("!galette:saucisse.bzh");
796 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
797
798 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
799 let related_event = event_factory
800 .text_msg("* Edited event")
801 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
802 .event_id(related_id)
803 .into();
804 let associated_related_event =
805 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
806
807 let client = logged_in_client(None).await;
808
809 let event_cache = client.event_cache();
810 event_cache.subscribe().unwrap();
811
812 client.base_client().get_or_create_room(room_id, RoomState::Joined);
813 let room = client.get_room(room_id).unwrap();
814
815 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
816
817 room_event_cache.save_events([original_event]).await;
819
820 room_event_cache.save_events([related_event]).await;
822
823 room_event_cache.save_events([associated_related_event]).await;
825
826 let (event, related_events) = room_event_cache
827 .find_event_with_relations(original_id, None)
828 .await
829 .expect("Failed to find the event with relations")
830 .expect("Event has no relation");
831 let cached_event_id = event.event_id().unwrap();
833 assert_eq!(cached_event_id, original_id);
834
835 assert_eq!(related_events.len(), 2);
837
838 let related_event_id = related_events[0].event_id().unwrap();
839 assert_eq!(related_event_id, related_id);
840 let related_event_id = related_events[1].event_id().unwrap();
841 assert_eq!(related_event_id, associated_related_id);
842 }
843
844 async fn assert_relations(
845 room_id: &RoomId,
846 original_event: Event,
847 related_event: Event,
848 event_factory: EventFactory,
849 ) {
850 let client = logged_in_client(None).await;
851
852 let event_cache = client.event_cache();
853 event_cache.subscribe().unwrap();
854
855 client.base_client().get_or_create_room(room_id, RoomState::Joined);
856 let room = client.get_room(room_id).unwrap();
857
858 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
859
860 let original_event_id = original_event.event_id().unwrap();
862 room_event_cache.save_events([original_event]).await;
863
864 let unrelated_id = event_id!("$2");
866 room_event_cache
867 .save_events([event_factory
868 .text_msg("An unrelated event")
869 .event_id(unrelated_id)
870 .into()])
871 .await;
872
873 let related_id = related_event.event_id().unwrap();
875 room_event_cache.save_events([related_event]).await;
876
877 let (event, related_events) = room_event_cache
878 .find_event_with_relations(&original_event_id, None)
879 .await
880 .expect("Failed to find the event with relations")
881 .expect("Event has no relation");
882 let cached_event_id = event.event_id().unwrap();
884 assert_eq!(cached_event_id, original_event_id);
885
886 let related_event_id = related_events[0].event_id().unwrap();
888 assert_eq!(related_event_id, related_id);
889 }
890}
891
892#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
894 use std::{ops::Not, sync::Arc};
895
896 use assert_matches::assert_matches;
897 use assert_matches2::assert_let;
898 use eyeball_im::VectorDiff;
899 use futures_util::FutureExt;
900 use matrix_sdk_base::{
901 RoomState,
902 event_cache::{
903 Gap,
904 store::{EventCacheStore as _, MemoryStore},
905 },
906 linked_chunk::{
907 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
908 lazy_loader::from_all_chunks,
909 },
910 store::StoreConfig,
911 sync::{JoinedRoomUpdate, Timeline},
912 };
913 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
914 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
915 use ruma::{
916 EventId, event_id,
917 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
918 room_id,
919 serde::Raw,
920 user_id,
921 };
922 use serde_json::json;
923 use tokio::task::yield_now;
924
925 use super::{
926 super::{
927 super::TimelineVectorDiffs, lock::Reload as _,
928 pagination::LoadMoreEventsBackwardsOutcome,
929 },
930 RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
931 };
932 use crate::{assert_let_timeout, test_utils::client::MockClientBuilder};
933
934 #[async_test]
935 async fn test_write_to_storage() {
936 let room_id = room_id!("!galette:saucisse.bzh");
937 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
938
939 let event_cache_store = Arc::new(MemoryStore::new());
940
941 let client = MockClientBuilder::new(None)
942 .on_builder(|builder| {
943 builder.store_config(
944 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
945 .event_cache_store(event_cache_store.clone()),
946 )
947 })
948 .build()
949 .await;
950
951 let event_cache = client.event_cache();
952
953 event_cache.subscribe().unwrap();
955
956 client.base_client().get_or_create_room(room_id, RoomState::Joined);
957 let room = client.get_room(room_id).unwrap();
958
959 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
960 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
961
962 let timeline = Timeline {
964 limited: true,
965 prev_batch: Some("raclette".to_owned()),
966 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
967 };
968
969 room_event_cache
970 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
971 .await
972 .unwrap();
973
974 assert_matches!(
976 generic_stream.recv().await,
977 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
978 assert_eq!(expected_room_id, room_id);
979 }
980 );
981 assert!(generic_stream.is_empty());
982
983 let linked_chunk = from_all_chunks::<3, _, _>(
985 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
986 )
987 .unwrap()
988 .unwrap();
989
990 assert_eq!(linked_chunk.chunks().count(), 2);
991
992 let mut chunks = linked_chunk.chunks();
993
994 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
996 assert_eq!(gap.token, "raclette");
997 });
998
999 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1001 assert_eq!(events.len(), 1);
1002 let deserialized = events[0].raw().deserialize().unwrap();
1003 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1004 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1005 });
1006
1007 assert!(chunks.next().is_none());
1009 }
1010
1011 #[async_test]
1012 async fn test_write_to_storage_strips_bundled_relations() {
1013 let room_id = room_id!("!galette:saucisse.bzh");
1014 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1015
1016 let event_cache_store = Arc::new(MemoryStore::new());
1017
1018 let client = MockClientBuilder::new(None)
1019 .on_builder(|builder| {
1020 builder.store_config(
1021 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1022 .event_cache_store(event_cache_store.clone()),
1023 )
1024 })
1025 .build()
1026 .await;
1027
1028 let event_cache = client.event_cache();
1029
1030 event_cache.subscribe().unwrap();
1032
1033 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1034 let room = client.get_room(room_id).unwrap();
1035
1036 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1037 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1038
1039 let ev = f
1041 .text_msg("hey yo")
1042 .sender(*ALICE)
1043 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
1044 .into_event();
1045
1046 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1047
1048 room_event_cache
1049 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1050 .await
1051 .unwrap();
1052
1053 assert_matches!(
1055 generic_stream.recv().await,
1056 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1057 assert_eq!(expected_room_id, room_id);
1058 }
1059 );
1060 assert!(generic_stream.is_empty());
1061
1062 {
1064 let events = room_event_cache.events().await.unwrap();
1065
1066 assert_eq!(events.len(), 1);
1067
1068 let ev = events[0].raw().deserialize().unwrap();
1069 assert_let!(
1070 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1071 );
1072
1073 let original = msg.as_original().unwrap();
1074 assert_eq!(original.content.body(), "hey yo");
1075 assert!(original.unsigned.relations.replace.is_some());
1076 }
1077
1078 let linked_chunk = from_all_chunks::<3, _, _>(
1080 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1081 )
1082 .unwrap()
1083 .unwrap();
1084
1085 assert_eq!(linked_chunk.chunks().count(), 1);
1086
1087 let mut chunks = linked_chunk.chunks();
1088 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1089 assert_eq!(events.len(), 1);
1090
1091 let ev = events[0].raw().deserialize().unwrap();
1092 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1093
1094 let original = msg.as_original().unwrap();
1095 assert_eq!(original.content.body(), "hey yo");
1096 assert!(original.unsigned.relations.replace.is_none());
1097 });
1098
1099 assert!(chunks.next().is_none());
1101 }
1102
1103 #[async_test]
1104 async fn test_clear() {
1105 let room_id = room_id!("!galette:saucisse.bzh");
1106 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1107
1108 let event_cache_store = Arc::new(MemoryStore::new());
1109
1110 let event_id1 = event_id!("$1");
1111 let event_id2 = event_id!("$2");
1112
1113 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1114 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1115
1116 event_cache_store
1118 .handle_linked_chunk_updates(
1119 LinkedChunkId::Room(room_id),
1120 vec![
1121 Update::NewItemsChunk {
1123 previous: None,
1124 new: ChunkIdentifier::new(0),
1125 next: None,
1126 },
1127 Update::NewGapChunk {
1129 previous: Some(ChunkIdentifier::new(0)),
1130 new: ChunkIdentifier::new(42),
1132 next: None,
1133 gap: Gap { token: "comté".to_owned() },
1134 },
1135 Update::NewItemsChunk {
1137 previous: Some(ChunkIdentifier::new(42)),
1138 new: ChunkIdentifier::new(1),
1139 next: None,
1140 },
1141 Update::PushItems {
1142 at: Position::new(ChunkIdentifier::new(1), 0),
1143 items: vec![ev1.clone()],
1144 },
1145 Update::NewItemsChunk {
1147 previous: Some(ChunkIdentifier::new(1)),
1148 new: ChunkIdentifier::new(2),
1149 next: None,
1150 },
1151 Update::PushItems {
1152 at: Position::new(ChunkIdentifier::new(2), 0),
1153 items: vec![ev2.clone()],
1154 },
1155 ],
1156 )
1157 .await
1158 .unwrap();
1159
1160 let client = MockClientBuilder::new(None)
1161 .on_builder(|builder| {
1162 builder.store_config(
1163 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1164 .event_cache_store(event_cache_store.clone()),
1165 )
1166 })
1167 .build()
1168 .await;
1169
1170 let event_cache = client.event_cache();
1171
1172 event_cache.subscribe().unwrap();
1174
1175 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1176 let room = client.get_room(room_id).unwrap();
1177
1178 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1179
1180 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1181 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1182
1183 {
1185 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1186 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1187 }
1188
1189 {
1191 assert_eq!(items.len(), 1);
1193 assert_eq!(items[0].event_id().unwrap(), event_id2);
1194
1195 assert!(stream.is_empty());
1196 }
1197
1198 {
1200 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1201
1202 assert_let_timeout!(
1203 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1204 stream.recv()
1205 );
1206 assert_eq!(diffs.len(), 1);
1207 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1208 assert_eq!(event.event_id().unwrap(), event_id1);
1210 });
1211
1212 assert!(stream.is_empty());
1213
1214 assert_let_timeout!(
1215 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1216 generic_stream.recv()
1217 );
1218 assert_eq!(room_id, expected_room_id);
1219 assert!(generic_stream.is_empty());
1220 }
1221
1222 room_event_cache.clear().await.unwrap();
1224
1225 assert_let_timeout!(
1227 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1228 stream.recv()
1229 );
1230 assert_eq!(diffs.len(), 1);
1231 assert_let!(VectorDiff::Clear = &diffs[0]);
1232
1233 assert_let_timeout!(
1235 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1236 );
1237 assert_eq!(received_room_id, room_id);
1238 assert!(generic_stream.is_empty());
1239
1240 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1243
1244 let items = room_event_cache.events().await.unwrap();
1246 assert!(items.is_empty());
1247
1248 let linked_chunk = from_all_chunks::<3, _, _>(
1250 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1251 )
1252 .unwrap()
1253 .unwrap();
1254
1255 assert_eq!(linked_chunk.num_items(), 0);
1259 }
1260
1261 #[async_test]
1262 async fn test_load_from_storage() {
1263 let room_id = room_id!("!galette:saucisse.bzh");
1264 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1265
1266 let event_cache_store = Arc::new(MemoryStore::new());
1267
1268 let event_id1 = event_id!("$1");
1269 let event_id2 = event_id!("$2");
1270
1271 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1272 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1273
1274 event_cache_store
1276 .handle_linked_chunk_updates(
1277 LinkedChunkId::Room(room_id),
1278 vec![
1279 Update::NewItemsChunk {
1281 previous: None,
1282 new: ChunkIdentifier::new(0),
1283 next: None,
1284 },
1285 Update::NewGapChunk {
1287 previous: Some(ChunkIdentifier::new(0)),
1288 new: ChunkIdentifier::new(42),
1290 next: None,
1291 gap: Gap { token: "cheddar".to_owned() },
1292 },
1293 Update::NewItemsChunk {
1295 previous: Some(ChunkIdentifier::new(42)),
1296 new: ChunkIdentifier::new(1),
1297 next: None,
1298 },
1299 Update::PushItems {
1300 at: Position::new(ChunkIdentifier::new(1), 0),
1301 items: vec![ev1.clone()],
1302 },
1303 Update::NewItemsChunk {
1305 previous: Some(ChunkIdentifier::new(1)),
1306 new: ChunkIdentifier::new(2),
1307 next: None,
1308 },
1309 Update::PushItems {
1310 at: Position::new(ChunkIdentifier::new(2), 0),
1311 items: vec![ev2.clone()],
1312 },
1313 ],
1314 )
1315 .await
1316 .unwrap();
1317
1318 let client = MockClientBuilder::new(None)
1319 .on_builder(|builder| {
1320 builder.store_config(
1321 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1322 .event_cache_store(event_cache_store.clone()),
1323 )
1324 })
1325 .build()
1326 .await;
1327
1328 let event_cache = client.event_cache();
1329
1330 event_cache.subscribe().unwrap();
1332
1333 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1335
1336 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1337 let room = client.get_room(room_id).unwrap();
1338
1339 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1340
1341 assert_matches!(
1344 generic_stream.recv().await,
1345 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1346 assert_eq!(room_id, expected_room_id);
1347 }
1348 );
1349 assert!(generic_stream.is_empty());
1350
1351 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1352
1353 assert_eq!(items.len(), 1);
1356 assert_eq!(items[0].event_id().unwrap(), event_id2);
1357 assert!(stream.is_empty());
1358
1359 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1361 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1362
1363 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1365
1366 assert_let_timeout!(
1367 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1368 stream.recv()
1369 );
1370 assert_eq!(diffs.len(), 1);
1371 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1372 assert_eq!(event.event_id().unwrap(), event_id1);
1373 });
1374
1375 assert!(stream.is_empty());
1376
1377 assert_matches!(
1379 generic_stream.recv().await,
1380 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1381 assert_eq!(expected_room_id, room_id);
1382 }
1383 );
1384 assert!(generic_stream.is_empty());
1385
1386 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1388
1389 room_event_cache
1390 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1391 .await
1392 .unwrap();
1393
1394 assert!(generic_stream.recv().now_or_never().is_none());
1397
1398 let items = room_event_cache.events().await.unwrap();
1403 assert_eq!(items.len(), 2);
1404 assert_eq!(items[0].event_id().unwrap(), event_id1);
1405 assert_eq!(items[1].event_id().unwrap(), event_id2);
1406 }
1407
1408 #[async_test]
1409 async fn test_load_from_storage_resilient_to_failure() {
1410 let room_id = room_id!("!fondue:patate.ch");
1411 let event_cache_store = Arc::new(MemoryStore::new());
1412
1413 let event = EventFactory::new()
1414 .room(room_id)
1415 .sender(user_id!("@ben:saucisse.bzh"))
1416 .text_msg("foo")
1417 .event_id(event_id!("$42"))
1418 .into_event();
1419
1420 event_cache_store
1422 .handle_linked_chunk_updates(
1423 LinkedChunkId::Room(room_id),
1424 vec![
1425 Update::NewItemsChunk {
1426 previous: None,
1427 new: ChunkIdentifier::new(0),
1428 next: None,
1429 },
1430 Update::PushItems {
1431 at: Position::new(ChunkIdentifier::new(0), 0),
1432 items: vec![event],
1433 },
1434 Update::NewItemsChunk {
1435 previous: Some(ChunkIdentifier::new(0)),
1436 new: ChunkIdentifier::new(1),
1437 next: Some(ChunkIdentifier::new(0)),
1438 },
1439 ],
1440 )
1441 .await
1442 .unwrap();
1443
1444 let client = MockClientBuilder::new(None)
1445 .on_builder(|builder| {
1446 builder.store_config(
1447 StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1448 .event_cache_store(event_cache_store.clone()),
1449 )
1450 })
1451 .build()
1452 .await;
1453
1454 let event_cache = client.event_cache();
1455
1456 event_cache.subscribe().unwrap();
1458
1459 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1460 let room = client.get_room(room_id).unwrap();
1461
1462 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1463
1464 let items = room_event_cache.events().await.unwrap();
1465
1466 assert!(items.is_empty());
1469
1470 let raw_chunks =
1473 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1474 assert!(raw_chunks.is_empty());
1475 }
1476
1477 #[async_test]
1478 async fn test_no_useless_gaps() {
1479 let room_id = room_id!("!galette:saucisse.bzh");
1480
1481 let client = MockClientBuilder::new(None).build().await;
1482
1483 let event_cache = client.event_cache();
1484 event_cache.subscribe().unwrap();
1485
1486 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1487 let room = client.get_room(room_id).unwrap();
1488 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1489 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1490
1491 let f = EventFactory::new().room(room_id).sender(*ALICE);
1492
1493 room_event_cache
1496 .handle_joined_room_update(JoinedRoomUpdate {
1497 timeline: Timeline {
1498 limited: true,
1499 prev_batch: Some("raclette".to_owned()),
1500 events: vec![f.text_msg("hey yo").into_event()],
1501 },
1502 ..Default::default()
1503 })
1504 .await
1505 .unwrap();
1506
1507 assert_matches!(
1509 generic_stream.recv().await,
1510 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1511 assert_eq!(expected_room_id, room_id);
1512 }
1513 );
1514 assert!(generic_stream.is_empty());
1515
1516 {
1517 let state = room_event_cache.inner.state.read().await.unwrap();
1518
1519 let mut num_gaps = 0;
1520 let mut num_events = 0;
1521
1522 for c in state.room_linked_chunk().chunks() {
1523 match c.content() {
1524 ChunkContent::Items(items) => num_events += items.len(),
1525 ChunkContent::Gap(_) => num_gaps += 1,
1526 }
1527 }
1528
1529 assert_eq!(num_gaps, 0);
1532 assert_eq!(num_events, 1);
1533 }
1534
1535 assert_matches!(
1537 room_event_cache.pagination().load_more_events_backwards().await.unwrap(),
1538 LoadMoreEventsBackwardsOutcome::Gap { .. }
1539 );
1540
1541 {
1542 let state = room_event_cache.inner.state.read().await.unwrap();
1543
1544 let mut num_gaps = 0;
1545 let mut num_events = 0;
1546
1547 for c in state.room_linked_chunk().chunks() {
1548 match c.content() {
1549 ChunkContent::Items(items) => num_events += items.len(),
1550 ChunkContent::Gap(_) => num_gaps += 1,
1551 }
1552 }
1553
1554 assert_eq!(num_gaps, 1);
1556 assert_eq!(num_events, 1);
1557 }
1558
1559 room_event_cache
1562 .handle_joined_room_update(JoinedRoomUpdate {
1563 timeline: Timeline {
1564 limited: false,
1565 prev_batch: Some("fondue".to_owned()),
1566 events: vec![f.text_msg("sup").into_event()],
1567 },
1568 ..Default::default()
1569 })
1570 .await
1571 .unwrap();
1572
1573 assert_matches!(
1575 generic_stream.recv().await,
1576 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1577 assert_eq!(expected_room_id, room_id);
1578 }
1579 );
1580 assert!(generic_stream.is_empty());
1581
1582 {
1583 let state = room_event_cache.inner.state.read().await.unwrap();
1584
1585 let mut num_gaps = 0;
1586 let mut num_events = 0;
1587
1588 for c in state.room_linked_chunk().chunks() {
1589 match c.content() {
1590 ChunkContent::Items(items) => num_events += items.len(),
1591 ChunkContent::Gap(gap) => {
1592 assert_eq!(gap.token, "raclette");
1593 num_gaps += 1;
1594 }
1595 }
1596 }
1597
1598 assert_eq!(num_gaps, 1);
1600 assert_eq!(num_events, 2);
1601 }
1602 }
1603
1604 #[async_test]
1605 async fn test_shrink_to_last_chunk() {
1606 let room_id = room_id!("!galette:saucisse.bzh");
1607
1608 let client = MockClientBuilder::new(None).build().await;
1609
1610 let f = EventFactory::new().room(room_id);
1611
1612 let evid1 = event_id!("$1");
1613 let evid2 = event_id!("$2");
1614
1615 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1616 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1617
1618 {
1620 client
1621 .event_cache_store()
1622 .lock()
1623 .await
1624 .expect("Could not acquire the event cache lock")
1625 .as_clean()
1626 .expect("Could not acquire a clean event cache lock")
1627 .handle_linked_chunk_updates(
1628 LinkedChunkId::Room(room_id),
1629 vec![
1630 Update::NewItemsChunk {
1631 previous: None,
1632 new: ChunkIdentifier::new(0),
1633 next: None,
1634 },
1635 Update::PushItems {
1636 at: Position::new(ChunkIdentifier::new(0), 0),
1637 items: vec![ev1],
1638 },
1639 Update::NewItemsChunk {
1640 previous: Some(ChunkIdentifier::new(0)),
1641 new: ChunkIdentifier::new(1),
1642 next: None,
1643 },
1644 Update::PushItems {
1645 at: Position::new(ChunkIdentifier::new(1), 0),
1646 items: vec![ev2],
1647 },
1648 ],
1649 )
1650 .await
1651 .unwrap();
1652 }
1653
1654 let event_cache = client.event_cache();
1655 event_cache.subscribe().unwrap();
1656
1657 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1658 let room = client.get_room(room_id).unwrap();
1659 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1660
1661 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1663 assert_eq!(events.len(), 1);
1664 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1665 assert!(stream.is_empty());
1666
1667 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1668
1669 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1671 assert_eq!(outcome.events.len(), 1);
1672 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1673 assert!(outcome.reached_start);
1674
1675 assert_let_timeout!(
1677 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1678 stream.recv()
1679 );
1680 assert_eq!(diffs.len(), 1);
1681 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1682 assert_eq!(value.event_id().as_deref(), Some(evid1));
1683 });
1684
1685 assert!(stream.is_empty());
1686
1687 assert_let_timeout!(
1689 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1690 );
1691 assert_eq!(expected_room_id, room_id);
1692 assert!(generic_stream.is_empty());
1693
1694 room_event_cache
1696 .inner
1697 .state
1698 .write()
1699 .await
1700 .unwrap()
1701 .reload()
1702 .await
1703 .expect("shrinking should succeed");
1704
1705 assert_let_timeout!(
1707 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1708 stream.recv()
1709 );
1710 assert_eq!(diffs.len(), 2);
1711 assert_matches!(&diffs[0], VectorDiff::Clear);
1712 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1713 assert_eq!(values.len(), 1);
1714 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1715 });
1716
1717 assert!(stream.is_empty());
1718
1719 assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1721 assert!(generic_stream.is_empty());
1722
1723 let events = room_event_cache.events().await.unwrap();
1725 assert_eq!(events.len(), 1);
1726 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1727
1728 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1731 assert_eq!(outcome.events.len(), 1);
1732 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1733 assert!(outcome.reached_start);
1734 }
1735
1736 #[async_test]
1737 async fn test_room_ordering() {
1738 let room_id = room_id!("!galette:saucisse.bzh");
1739
1740 let client = MockClientBuilder::new(None).build().await;
1741
1742 let f = EventFactory::new().room(room_id).sender(*ALICE);
1743
1744 let evid1 = event_id!("$1");
1745 let evid2 = event_id!("$2");
1746 let evid3 = event_id!("$3");
1747
1748 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1749 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1750 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1751
1752 {
1754 client
1755 .event_cache_store()
1756 .lock()
1757 .await
1758 .expect("Could not acquire the event cache lock")
1759 .as_clean()
1760 .expect("Could not acquire a clean event cache lock")
1761 .handle_linked_chunk_updates(
1762 LinkedChunkId::Room(room_id),
1763 vec![
1764 Update::NewItemsChunk {
1765 previous: None,
1766 new: ChunkIdentifier::new(0),
1767 next: None,
1768 },
1769 Update::PushItems {
1770 at: Position::new(ChunkIdentifier::new(0), 0),
1771 items: vec![ev1, ev2],
1772 },
1773 Update::NewItemsChunk {
1774 previous: Some(ChunkIdentifier::new(0)),
1775 new: ChunkIdentifier::new(1),
1776 next: None,
1777 },
1778 Update::PushItems {
1779 at: Position::new(ChunkIdentifier::new(1), 0),
1780 items: vec![ev3.clone()],
1781 },
1782 ],
1783 )
1784 .await
1785 .unwrap();
1786 }
1787
1788 let event_cache = client.event_cache();
1789 event_cache.subscribe().unwrap();
1790
1791 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1792 let room = client.get_room(room_id).unwrap();
1793 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1794
1795 {
1798 let state = room_event_cache.inner.state.read().await.unwrap();
1799 let room_linked_chunk = state.room_linked_chunk();
1800
1801 assert_eq!(
1803 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1804 Some(0)
1805 );
1806
1807 assert_eq!(
1809 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1810 Some(1)
1811 );
1812
1813 let mut events = room_linked_chunk.events();
1815 let (pos, ev) = events.next().unwrap();
1816 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1817 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1818 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1819
1820 assert!(events.next().is_none());
1822 }
1823
1824 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1826 assert!(outcome.reached_start);
1827
1828 {
1831 let state = room_event_cache.inner.state.read().await.unwrap();
1832 let room_linked_chunk = state.room_linked_chunk();
1833
1834 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1835 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1836 }
1837 }
1838
1839 let evid4 = event_id!("$4");
1844 room_event_cache
1845 .handle_joined_room_update(JoinedRoomUpdate {
1846 timeline: Timeline {
1847 limited: true,
1848 prev_batch: Some("fondue".to_owned()),
1849 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1850 },
1851 ..Default::default()
1852 })
1853 .await
1854 .unwrap();
1855
1856 {
1857 let state = room_event_cache.inner.state.read().await.unwrap();
1858 let room_linked_chunk = state.room_linked_chunk();
1859
1860 let mut events = room_linked_chunk.events();
1862
1863 let (pos, ev) = events.next().unwrap();
1864 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1865 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1866
1867 let (pos, ev) = events.next().unwrap();
1868 assert_eq!(ev.event_id().as_deref(), Some(evid4));
1869 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1870
1871 assert!(events.next().is_none());
1873
1874 assert_eq!(
1876 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1877 Some(0)
1878 );
1879 assert_eq!(
1880 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1881 Some(1)
1882 );
1883
1884 assert_eq!(
1887 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1888 None
1889 );
1890 }
1891 }
1892
1893 #[async_test]
1894 async fn test_auto_shrink_after_all_subscribers_are_gone() {
1895 let room_id = room_id!("!galette:saucisse.bzh");
1896
1897 let client = MockClientBuilder::new(None).build().await;
1898
1899 let f = EventFactory::new().room(room_id);
1900
1901 let evid1 = event_id!("$1");
1902 let evid2 = event_id!("$2");
1903
1904 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1905 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1906
1907 {
1909 client
1910 .event_cache_store()
1911 .lock()
1912 .await
1913 .expect("Could not acquire the event cache lock")
1914 .as_clean()
1915 .expect("Could not acquire a clean event cache lock")
1916 .handle_linked_chunk_updates(
1917 LinkedChunkId::Room(room_id),
1918 vec![
1919 Update::NewItemsChunk {
1920 previous: None,
1921 new: ChunkIdentifier::new(0),
1922 next: None,
1923 },
1924 Update::PushItems {
1925 at: Position::new(ChunkIdentifier::new(0), 0),
1926 items: vec![ev1],
1927 },
1928 Update::NewItemsChunk {
1929 previous: Some(ChunkIdentifier::new(0)),
1930 new: ChunkIdentifier::new(1),
1931 next: None,
1932 },
1933 Update::PushItems {
1934 at: Position::new(ChunkIdentifier::new(1), 0),
1935 items: vec![ev2],
1936 },
1937 ],
1938 )
1939 .await
1940 .unwrap();
1941 }
1942
1943 let event_cache = client.event_cache();
1944 event_cache.subscribe().unwrap();
1945
1946 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1947 let room = client.get_room(room_id).unwrap();
1948 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1949
1950 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1952 assert_eq!(events1.len(), 1);
1953 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1954 assert!(stream1.is_empty());
1955
1956 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1957
1958 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1960 assert_eq!(outcome.events.len(), 1);
1961 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1962 assert!(outcome.reached_start);
1963
1964 assert_let_timeout!(
1967 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1968 stream1.recv()
1969 );
1970 assert_eq!(diffs.len(), 1);
1971 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1972 assert_eq!(value.event_id().as_deref(), Some(evid1));
1973 });
1974
1975 assert!(stream1.is_empty());
1976
1977 assert_let_timeout!(
1978 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1979 );
1980 assert_eq!(expected_room_id, room_id);
1981 assert!(generic_stream.is_empty());
1982
1983 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1987 assert_eq!(events2.len(), 2);
1988 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1989 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1990 assert!(stream2.is_empty());
1991
1992 drop(stream1);
1994 yield_now().await;
1995
1996 assert!(stream2.is_empty());
1998
1999 drop(stream2);
2001 yield_now().await;
2002
2003 {
2006 let state = room_event_cache.inner.state.read().await.unwrap();
2008 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
2009 }
2010
2011 let events3 = room_event_cache.events().await.unwrap();
2013 assert_eq!(events3.len(), 1);
2014 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2015 }
2016
2017 #[async_test]
2018 async fn test_rfind_map_event_in_memory_by() {
2019 let user_id = user_id!("@mnt_io:matrix.org");
2020 let room_id = room_id!("!raclette:patate.ch");
2021 let client = MockClientBuilder::new(None).build().await;
2022
2023 let event_factory = EventFactory::new().room(room_id);
2024
2025 let event_id_0 = event_id!("$ev0");
2026 let event_id_1 = event_id!("$ev1");
2027 let event_id_2 = event_id!("$ev2");
2028 let event_id_3 = event_id!("$ev3");
2029
2030 let event_0 =
2031 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
2032 let event_1 =
2033 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
2034 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
2035 let event_3 =
2036 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
2037
2038 {
2041 client
2042 .event_cache_store()
2043 .lock()
2044 .await
2045 .expect("Could not acquire the event cache lock")
2046 .as_clean()
2047 .expect("Could not acquire a clean event cache lock")
2048 .handle_linked_chunk_updates(
2049 LinkedChunkId::Room(room_id),
2050 vec![
2051 Update::NewItemsChunk {
2052 previous: None,
2053 new: ChunkIdentifier::new(0),
2054 next: None,
2055 },
2056 Update::PushItems {
2057 at: Position::new(ChunkIdentifier::new(0), 0),
2058 items: vec![event_3],
2059 },
2060 Update::NewItemsChunk {
2061 previous: Some(ChunkIdentifier::new(0)),
2062 new: ChunkIdentifier::new(1),
2063 next: None,
2064 },
2065 Update::PushItems {
2066 at: Position::new(ChunkIdentifier::new(1), 0),
2067 items: vec![event_0, event_1, event_2],
2068 },
2069 ],
2070 )
2071 .await
2072 .unwrap();
2073 }
2074
2075 let event_cache = client.event_cache();
2076 event_cache.subscribe().unwrap();
2077
2078 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2079 let room = client.get_room(room_id).unwrap();
2080 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2081
2082 assert_matches!(
2084 room_event_cache
2085 .rfind_map_event_in_memory_by(|event| {
2086 (event.sender().as_deref() == Some(*BOB)).then(|| event.event_id())
2087 })
2088 .await,
2089 Ok(Some(event_id)) => {
2090 assert_eq!(event_id.as_deref(), Some(event_id_0));
2091 }
2092 );
2093
2094 assert_matches!(
2097 room_event_cache
2098 .rfind_map_event_in_memory_by(|event| {
2099 (event.sender().as_deref() == Some(*ALICE)).then(|| event.event_id())
2100 })
2101 .await,
2102 Ok(Some(event_id)) => {
2103 assert_eq!(event_id.as_deref(), Some(event_id_2));
2104 }
2105 );
2106
2107 assert!(
2109 room_event_cache
2110 .rfind_map_event_in_memory_by(|event| {
2111 (event.sender().as_deref() == Some(user_id)).then(|| event.event_id())
2112 })
2113 .await
2114 .unwrap()
2115 .is_none()
2116 );
2117
2118 assert!(
2120 room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2121 );
2122 }
2123
2124 #[async_test]
2125 async fn test_reload_when_dirty() {
2126 let user_id = user_id!("@mnt_io:matrix.org");
2127 let room_id = room_id!("!raclette:patate.ch");
2128
2129 let event_cache_store = MemoryStore::new();
2131
2132 let client_p0 = MockClientBuilder::new(None)
2134 .on_builder(|builder| {
2135 builder.store_config(
2136 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2137 .event_cache_store(event_cache_store.clone()),
2138 )
2139 })
2140 .build()
2141 .await;
2142
2143 let client_p1 = MockClientBuilder::new(None)
2145 .on_builder(|builder| {
2146 builder.store_config(
2147 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2148 .event_cache_store(event_cache_store),
2149 )
2150 })
2151 .build()
2152 .await;
2153
2154 let event_factory = EventFactory::new().room(room_id).sender(user_id);
2155
2156 let ev_id_0 = event_id!("$ev_0");
2157 let ev_id_1 = event_id!("$ev_1");
2158
2159 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2160 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2161
2162 client_p0
2164 .event_cache_store()
2165 .lock()
2166 .await
2167 .expect("[p0] Could not acquire the event cache lock")
2168 .as_clean()
2169 .expect("[p0] Could not acquire a clean event cache lock")
2170 .handle_linked_chunk_updates(
2171 LinkedChunkId::Room(room_id),
2172 vec![
2173 Update::NewItemsChunk {
2174 previous: None,
2175 new: ChunkIdentifier::new(0),
2176 next: None,
2177 },
2178 Update::PushItems {
2179 at: Position::new(ChunkIdentifier::new(0), 0),
2180 items: vec![ev_0],
2181 },
2182 Update::NewItemsChunk {
2183 previous: Some(ChunkIdentifier::new(0)),
2184 new: ChunkIdentifier::new(1),
2185 next: None,
2186 },
2187 Update::PushItems {
2188 at: Position::new(ChunkIdentifier::new(1), 0),
2189 items: vec![ev_1],
2190 },
2191 ],
2192 )
2193 .await
2194 .unwrap();
2195
2196 let (room_event_cache_p0, room_event_cache_p1) = {
2198 let event_cache_p0 = client_p0.event_cache();
2199 event_cache_p0.subscribe().unwrap();
2200
2201 let event_cache_p1 = client_p1.event_cache();
2202 event_cache_p1.subscribe().unwrap();
2203
2204 client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
2205 client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
2206
2207 let (room_event_cache_p0, _drop_handles) =
2208 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2209 let (room_event_cache_p1, _drop_handles) =
2210 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2211
2212 (room_event_cache_p0, room_event_cache_p1)
2213 };
2214
2215 let mut updates_stream_p0 = {
2220 let room_event_cache = &room_event_cache_p0;
2221
2222 let (initial_updates, mut updates_stream) =
2223 room_event_cache_p0.subscribe().await.unwrap();
2224
2225 assert_eq!(initial_updates.len(), 1);
2227 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2228 assert!(updates_stream.is_empty());
2229
2230 assert!(event_loaded(room_event_cache, ev_id_1).await);
2232
2233 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2235
2236 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2238
2239 assert_matches!(
2241 updates_stream.recv().await.unwrap(),
2242 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2243 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2244 assert_matches!(
2245 &diffs[0],
2246 VectorDiff::Insert { index: 0, value: event } => {
2247 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2248 }
2249 );
2250 }
2251 );
2252
2253 assert!(event_loaded(room_event_cache, ev_id_0).await);
2255
2256 updates_stream
2257 };
2258
2259 let mut updates_stream_p1 = {
2261 let room_event_cache = &room_event_cache_p1;
2262 let (initial_updates, mut updates_stream) =
2263 room_event_cache_p1.subscribe().await.unwrap();
2264
2265 assert_eq!(initial_updates.len(), 1);
2267 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2268 assert!(updates_stream.is_empty());
2269
2270 assert!(event_loaded(room_event_cache, ev_id_1).await);
2272
2273 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2275
2276 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2278
2279 assert_matches!(
2281 updates_stream.recv().await.unwrap(),
2282 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2283 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2284 assert_matches!(
2285 &diffs[0],
2286 VectorDiff::Insert { index: 0, value: event } => {
2287 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2288 }
2289 );
2290 }
2291 );
2292
2293 assert!(event_loaded(room_event_cache, ev_id_0).await);
2295
2296 updates_stream
2297 };
2298
2299 for _ in 0..3 {
2301 {
2305 let room_event_cache = &room_event_cache_p0;
2306 let updates_stream = &mut updates_stream_p0;
2307
2308 assert!(event_loaded(room_event_cache, ev_id_1).await);
2310
2311 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2314
2315 assert_matches!(
2317 updates_stream.recv().await.unwrap(),
2318 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2319 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2320 assert_matches!(&diffs[0], VectorDiff::Clear);
2321 assert_matches!(
2322 &diffs[1],
2323 VectorDiff::Append { values: events } => {
2324 assert_eq!(events.len(), 1);
2325 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2326 }
2327 );
2328 }
2329 );
2330
2331 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2333
2334 assert!(event_loaded(room_event_cache, ev_id_0).await);
2336
2337 assert_matches!(
2339 updates_stream.recv().await.unwrap(),
2340 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2341 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2342 assert_matches!(
2343 &diffs[0],
2344 VectorDiff::Insert { index: 0, value: event } => {
2345 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2346 }
2347 );
2348 }
2349 );
2350 }
2351
2352 {
2356 let room_event_cache = &room_event_cache_p1;
2357 let updates_stream = &mut updates_stream_p1;
2358
2359 assert!(event_loaded(room_event_cache, ev_id_1).await);
2361
2362 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2365
2366 assert_matches!(
2368 updates_stream.recv().await.unwrap(),
2369 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2370 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2371 assert_matches!(&diffs[0], VectorDiff::Clear);
2372 assert_matches!(
2373 &diffs[1],
2374 VectorDiff::Append { values: events } => {
2375 assert_eq!(events.len(), 1);
2376 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2377 }
2378 );
2379 }
2380 );
2381
2382 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2384
2385 assert!(event_loaded(room_event_cache, ev_id_0).await);
2387
2388 assert_matches!(
2390 updates_stream.recv().await.unwrap(),
2391 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2392 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2393 assert_matches!(
2394 &diffs[0],
2395 VectorDiff::Insert { index: 0, value: event } => {
2396 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2397 }
2398 );
2399 }
2400 );
2401 }
2402 }
2403
2404 for _ in 0..3 {
2407 {
2408 let room_event_cache = &room_event_cache_p0;
2409 let updates_stream = &mut updates_stream_p0;
2410
2411 let guard = room_event_cache.inner.state.read().await.unwrap();
2412
2413 assert!(guard.is_dirty().not());
2419
2420 assert_matches!(
2422 updates_stream.recv().await.unwrap(),
2423 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2424 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2425 assert_matches!(&diffs[0], VectorDiff::Clear);
2426 assert_matches!(
2427 &diffs[1],
2428 VectorDiff::Append { values: events } => {
2429 assert_eq!(events.len(), 1);
2430 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2431 }
2432 );
2433 }
2434 );
2435
2436 assert!(event_loaded(room_event_cache, ev_id_1).await);
2437 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2438
2439 drop(guard);
2445
2446 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2447 assert!(event_loaded(room_event_cache, ev_id_0).await);
2448
2449 assert_matches!(
2451 updates_stream.recv().await.unwrap(),
2452 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2453 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2454 assert_matches!(
2455 &diffs[0],
2456 VectorDiff::Insert { index: 0, value: event } => {
2457 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2458 }
2459 );
2460 }
2461 );
2462 }
2463
2464 {
2465 let room_event_cache = &room_event_cache_p1;
2466 let updates_stream = &mut updates_stream_p1;
2467
2468 let guard = room_event_cache.inner.state.read().await.unwrap();
2469
2470 assert!(guard.is_dirty().not());
2475
2476 assert_matches!(
2478 updates_stream.recv().await.unwrap(),
2479 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2480 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2481 assert_matches!(&diffs[0], VectorDiff::Clear);
2482 assert_matches!(
2483 &diffs[1],
2484 VectorDiff::Append { values: events } => {
2485 assert_eq!(events.len(), 1);
2486 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2487 }
2488 );
2489 }
2490 );
2491
2492 assert!(event_loaded(room_event_cache, ev_id_1).await);
2493 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2494
2495 drop(guard);
2501
2502 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2503 assert!(event_loaded(room_event_cache, ev_id_0).await);
2504
2505 assert_matches!(
2507 updates_stream.recv().await.unwrap(),
2508 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2509 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2510 assert_matches!(
2511 &diffs[0],
2512 VectorDiff::Insert { index: 0, value: event } => {
2513 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2514 }
2515 );
2516 }
2517 );
2518 }
2519 }
2520
2521 for _ in 0..3 {
2523 {
2524 let room_event_cache = &room_event_cache_p0;
2525 let updates_stream = &mut updates_stream_p0;
2526
2527 let guard = room_event_cache.inner.state.write().await.unwrap();
2528
2529 assert!(guard.is_dirty().not());
2531
2532 assert_matches!(
2534 updates_stream.recv().await.unwrap(),
2535 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2536 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2537 assert_matches!(&diffs[0], VectorDiff::Clear);
2538 assert_matches!(
2539 &diffs[1],
2540 VectorDiff::Append { values: events } => {
2541 assert_eq!(events.len(), 1);
2542 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2543 }
2544 );
2545 }
2546 );
2547
2548 drop(guard);
2551
2552 assert!(event_loaded(room_event_cache, ev_id_1).await);
2553 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2554
2555 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2556 assert!(event_loaded(room_event_cache, ev_id_0).await);
2557
2558 assert_matches!(
2560 updates_stream.recv().await.unwrap(),
2561 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2562 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2563 assert_matches!(
2564 &diffs[0],
2565 VectorDiff::Insert { index: 0, value: event } => {
2566 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2567 }
2568 );
2569 }
2570 );
2571 }
2572
2573 {
2574 let room_event_cache = &room_event_cache_p1;
2575 let updates_stream = &mut updates_stream_p1;
2576
2577 let guard = room_event_cache.inner.state.write().await.unwrap();
2578
2579 assert!(guard.is_dirty().not());
2581
2582 assert_matches!(
2584 updates_stream.recv().await.unwrap(),
2585 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2586 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2587 assert_matches!(&diffs[0], VectorDiff::Clear);
2588 assert_matches!(
2589 &diffs[1],
2590 VectorDiff::Append { values: events } => {
2591 assert_eq!(events.len(), 1);
2592 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2593 }
2594 );
2595 }
2596 );
2597
2598 drop(guard);
2601
2602 assert!(event_loaded(room_event_cache, ev_id_1).await);
2603 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2604
2605 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2606 assert!(event_loaded(room_event_cache, ev_id_0).await);
2607
2608 assert_matches!(
2610 updates_stream.recv().await.unwrap(),
2611 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2612 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2613 assert_matches!(
2614 &diffs[0],
2615 VectorDiff::Insert { index: 0, value: event } => {
2616 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2617 }
2618 );
2619 }
2620 );
2621 }
2622 }
2623 }
2624
2625 #[async_test]
2626 async fn test_load_when_dirty() {
2627 let room_id_0 = room_id!("!raclette:patate.ch");
2628 let room_id_1 = room_id!("!morbiflette:patate.ch");
2629
2630 let event_cache_store = MemoryStore::new();
2632
2633 let client_p0 = MockClientBuilder::new(None)
2635 .on_builder(|builder| {
2636 builder.store_config(
2637 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2638 .event_cache_store(event_cache_store.clone()),
2639 )
2640 })
2641 .build()
2642 .await;
2643
2644 let client_p1 = MockClientBuilder::new(None)
2646 .on_builder(|builder| {
2647 builder.store_config(
2648 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2649 .event_cache_store(event_cache_store),
2650 )
2651 })
2652 .build()
2653 .await;
2654
2655 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2657 let event_cache_p0 = client_p0.event_cache();
2658 event_cache_p0.subscribe().unwrap();
2659
2660 let event_cache_p1 = client_p1.event_cache();
2661 event_cache_p1.subscribe().unwrap();
2662
2663 client_p0.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2664 client_p0.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2665
2666 client_p1.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2667 client_p1.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2668
2669 let (room_event_cache_0_p0, _drop_handles) =
2670 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2671 let (room_event_cache_0_p1, _drop_handles) =
2672 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2673
2674 (room_event_cache_0_p0, room_event_cache_0_p1)
2675 };
2676
2677 {
2679 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2680 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2681 }
2682
2683 let (room_event_cache_1_p0, _) =
2687 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2688
2689 {
2691 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2692 assert!(guard.is_dirty().not());
2693 }
2694
2695 }
2698
2699 #[async_test]
2700 async fn test_uniq_read_marker() {
2701 let client = MockClientBuilder::new(None).build().await;
2702 let room_id = room_id!("!galette:saucisse.bzh");
2703 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2704
2705 let event_cache = client.event_cache();
2706
2707 event_cache.subscribe().unwrap();
2708
2709 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2710 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
2711 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
2712
2713 assert!(events.is_empty());
2714
2715 let read_marker_event = Raw::from_json_string(
2717 json!({
2718 "content": {
2719 "event_id": "$crepe:saucisse.bzh"
2720 },
2721 "room_id": "!galette:saucisse.bzh",
2722 "type": "m.fully_read"
2723 })
2724 .to_string(),
2725 )
2726 .unwrap();
2727 let account_data = vec![read_marker_event; 100];
2728
2729 room_event_cache
2730 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
2731 .await
2732 .unwrap();
2733
2734 assert_matches!(
2736 stream.recv().await.unwrap(),
2737 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
2738 );
2739
2740 assert!(stream.recv().now_or_never().is_none());
2741
2742 assert!(generic_stream.recv().now_or_never().is_none());
2744 }
2745
2746 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2747 room_event_cache
2748 .rfind_map_event_in_memory_by(|event| {
2749 (event.event_id().as_deref() == Some(event_id)).then_some(())
2750 })
2751 .await
2752 .unwrap()
2753 .is_some()
2754 }
2755}