1pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21 collections::BTreeMap,
22 fmt,
23 sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28 deserialized_responses::AmbiguityChange,
29 event_cache::Event,
30 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
31};
32use ruma::{
33 EventId, OwnedEventId, OwnedRoomId, RoomId,
34 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
35 serde::Raw,
36};
37pub(super) use state::{LockedRoomEventCacheState, RoomEventCacheStateLockWriteGuard};
38pub use subscriber::RoomEventCacheSubscriber;
39use tokio::sync::{Notify, broadcast::Receiver, mpsc};
40use tracing::{instrument, trace, warn};
41pub use updates::{
42 RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
43 RoomEventCacheUpdateSender,
44};
45
46use super::{
47 super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination},
48 TimelineVectorDiffs,
49 event_linked_chunk::sort_positions_descending,
50 thread::pagination::ThreadPagination,
51};
52use crate::{
53 client::WeakClient,
54 event_cache::{
55 EventFocusThreadMode,
56 caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus},
57 },
58 room::WeakRoom,
59};
60
61#[derive(Clone)]
65pub struct RoomEventCache {
66 inner: Arc<RoomEventCacheInner>,
67}
68
69impl fmt::Debug for RoomEventCache {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("RoomEventCache").finish_non_exhaustive()
72 }
73}
74
75impl RoomEventCache {
76 pub(super) fn new(
78 room_id: OwnedRoomId,
79 weak_room: WeakRoom,
80 state: LockedRoomEventCacheState,
81 shared_pagination_status: SharedObservable<SharedPaginationStatus>,
82 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
83 update_sender: RoomEventCacheUpdateSender,
84 ) -> Self {
85 Self {
86 inner: Arc::new(RoomEventCacheInner::new(
87 room_id,
88 weak_room,
89 state,
90 shared_pagination_status,
91 auto_shrink_sender,
92 update_sender,
93 )),
94 }
95 }
96
97 pub fn room_id(&self) -> &RoomId {
99 &self.inner.room_id
100 }
101
102 pub async fn events(&self) -> Result<Vec<Event>> {
107 let state = self.inner.state.read().await?;
108
109 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
110 }
111
112 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
119 let state = self.inner.state.read().await?;
120 let events =
121 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
122
123 let subscriber_count = state.subscriber_count();
124 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
125 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
126
127 let subscriber = RoomEventCacheSubscriber::new(
128 self.inner.update_sender.new_room_receiver(),
129 self.inner.room_id.clone(),
130 self.inner.auto_shrink_sender.clone(),
131 subscriber_count.clone(),
132 );
133
134 Ok((events, subscriber))
135 }
136
137 pub async fn subscribe_to_thread(
140 &self,
141 thread_root: OwnedEventId,
142 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
143 let mut state = self.inner.state.write().await?;
144
145 state.subscribe_to_thread(thread_root).await
146 }
147
148 pub async fn subscribe_to_pinned_events(
158 &self,
159 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
160 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
161 let state = self.inner.state.read().await?;
162
163 state.subscribe_to_pinned_events(room).await
164 }
165
166 #[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))]
180 pub async fn get_or_create_event_focused_cache(
181 &self,
182 event_id: OwnedEventId,
183 num_context_events: u16,
184 thread_mode: EventFocusThreadMode,
185 ) -> Result<EventFocusedCache> {
186 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
187 let guard = self.inner.state.read().await?;
188
189 if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
191 trace!("the cache was already created, returning it");
192 return Ok(cache);
193 }
194
195 let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone();
197
198 drop(guard);
202
203 let room_id = room.room_id().to_owned();
204 let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone());
205
206 trace!("creating a fresh event-focused cache");
207 let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender);
208
209 cache.start_from(room, num_context_events, thread_mode).await?;
211
212 let mut guard = self.inner.state.write().await?;
213
214 if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
217 trace!("another cache has been racily created, returning it");
218 return Ok(cache);
219 }
220
221 guard.insert_event_focused_cache(event_id, thread_mode, cache.clone());
223
224 Ok(cache)
225 }
226
227 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234 pub async fn get_event_focused_cache(
235 &self,
236 event_id: OwnedEventId,
237 thread_mode: EventFocusThreadMode,
238 ) -> Result<Option<EventFocusedCache>> {
239 Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode))
240 }
241
242 pub fn pagination(&self) -> RoomPagination {
245 RoomPagination::new(self.inner.clone())
246 }
247
248 pub async fn thread_pagination(&self, thread_id: OwnedEventId) -> Result<ThreadPagination> {
251 Ok(self.inner.state.write().await?.get_or_reload_thread(thread_id).pagination())
252 }
253
254 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
262 where
263 P: FnMut(&Event) -> Option<O>,
264 {
265 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
266 }
267
268 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
273 Ok(self
274 .inner
275 .state
276 .read()
277 .await?
278 .find_event(event_id)
279 .await
280 .ok()
281 .flatten()
282 .map(|(_loc, event)| event))
283 }
284
285 pub async fn find_event_with_relations(
297 &self,
298 event_id: &EventId,
299 filter: Option<Vec<RelationType>>,
300 ) -> Result<Option<(Event, Vec<Event>)>> {
301 Ok(self
303 .inner
304 .state
305 .read()
306 .await?
307 .find_event_with_relations(event_id, filter.clone())
308 .await
309 .ok()
310 .flatten())
311 }
312
313 pub async fn find_event_relations(
325 &self,
326 event_id: &EventId,
327 filter: Option<Vec<RelationType>>,
328 ) -> Result<Vec<Event>> {
329 self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
331 }
332
333 pub async fn clear(&self) -> Result<()> {
338 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
340
341 self.inner.update_sender.send(
343 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
344 diffs: updates_as_vector_diffs,
345 origin: EventsOrigin::Cache,
346 }),
347 Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
348 );
349
350 Ok(())
351 }
352
353 pub(in super::super) fn state(&self) -> &LockedRoomEventCacheState {
355 &self.inner.state
356 }
357
358 #[instrument(skip_all, fields(room_id = %self.room_id()))]
360 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
361 self.inner
362 .handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes)
363 .await?;
364 self.inner.handle_account_data(updates.account_data);
365
366 Ok(())
367 }
368
369 #[instrument(skip_all, fields(room_id = %self.room_id()))]
371 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
372 self.inner.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
373
374 Ok(())
375 }
376
377 pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
379 &self.inner.update_sender
380 }
381
382 pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
384 self.inner
385 .handle_timeline(
386 Timeline { limited: false, prev_batch: None, events: vec![event] },
387 Vec::new(),
388 BTreeMap::new(),
389 )
390 .await
391 }
392
393 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
396 match self.inner.state.write().await {
397 Ok(mut state_guard) => {
398 if let Err(err) = state_guard.save_events(events).await {
399 warn!("couldn't save event in the event cache: {err}");
400 }
401 }
402
403 Err(err) => {
404 warn!("couldn't save event in the event cache: {err}");
405 }
406 }
407 }
408
409 pub async fn debug_string(&self) -> Vec<String> {
412 match self.inner.state.read().await {
413 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
414 Err(err) => {
415 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
416
417 vec![]
418 }
419 }
420 }
421}
422
423pub(super) struct RoomEventCacheInner {
425 room_id: OwnedRoomId,
427
428 pub weak_room: WeakRoom,
429
430 pub state: LockedRoomEventCacheState,
432
433 pub pagination_batch_token_notifier: Notify,
435
436 pub shared_pagination_status: SharedObservable<SharedPaginationStatus>,
437
438 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
443
444 update_sender: RoomEventCacheUpdateSender,
446}
447
448impl RoomEventCacheInner {
449 fn new(
452 room_id: OwnedRoomId,
453 weak_room: WeakRoom,
454 state: LockedRoomEventCacheState,
455 shared_pagination_status: SharedObservable<SharedPaginationStatus>,
456 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
457 update_sender: RoomEventCacheUpdateSender,
458 ) -> Self {
459 Self {
460 room_id,
461 weak_room,
462 state,
463 update_sender,
464 pagination_batch_token_notifier: Default::default(),
465 auto_shrink_sender,
466 shared_pagination_status,
467 }
468 }
469
470 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
471 if account_data.is_empty() {
472 return;
473 }
474
475 let mut handled_read_marker = false;
476
477 trace!("Handling account data");
478
479 for raw_event in account_data {
480 match raw_event.deserialize() {
481 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
482 if handled_read_marker {
485 continue;
486 }
487
488 handled_read_marker = true;
489
490 self.update_sender.send(
492 RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
493 None,
494 );
495 }
496
497 Ok(_) => {
498 }
501
502 Err(e) => {
503 let event_type = raw_event.get_field::<String>("type").ok().flatten();
504 warn!(event_type, "Failed to deserialize account data: {e}");
505 }
506 }
507 }
508 }
509
510 async fn handle_timeline(
513 &self,
514 timeline: Timeline,
515 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
516 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
517 ) -> Result<()> {
518 if timeline.events.is_empty()
519 && timeline.prev_batch.is_none()
520 && ephemeral_events.is_empty()
521 && ambiguity_changes.is_empty()
522 {
523 return Ok(());
524 }
525
526 trace!("adding new events");
528
529 let (stored_prev_batch_token, timeline_event_diffs) =
530 self.state.write().await?.handle_sync(timeline, &ephemeral_events).await?;
531
532 if stored_prev_batch_token {
535 self.pagination_batch_token_notifier.notify_one();
536 }
537
538 if !timeline_event_diffs.is_empty() {
541 self.update_sender.send(
542 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
543 diffs: timeline_event_diffs,
544 origin: EventsOrigin::Sync,
545 }),
546 Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
547 );
548 }
549
550 if !ephemeral_events.is_empty() {
551 self.update_sender
552 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
553 }
554
555 if !ambiguity_changes.is_empty() {
556 self.update_sender
557 .send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }, None);
558 }
559
560 Ok(())
561 }
562}
563
564#[derive(Clone, Copy)]
565pub(in super::super) enum PostProcessingOrigin {
566 Sync,
567 Backpagination,
568 #[cfg(feature = "e2e-encryption")]
569 Redecryption,
570}
571
572#[cfg(test)]
573mod tests {
574 use matrix_sdk_base::{RoomState, event_cache::Event};
575 use matrix_sdk_test::{async_test, event_factory::EventFactory};
576 use ruma::{
577 RoomId, event_id,
578 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
579 room_id, user_id,
580 };
581
582 use crate::test_utils::logged_in_client;
583
584 #[async_test]
585 async fn test_find_event_by_id_with_edit_relation() {
586 let original_id = event_id!("$original");
587 let related_id = event_id!("$related");
588 let room_id = room_id!("!galette:saucisse.bzh");
589 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
590
591 assert_relations(
592 room_id,
593 f.text_msg("Original event").event_id(original_id).into(),
594 f.text_msg("* An edited event")
595 .edit(
596 original_id,
597 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
598 )
599 .event_id(related_id)
600 .into(),
601 f,
602 )
603 .await;
604 }
605
606 #[async_test]
607 async fn test_find_event_by_id_with_thread_reply_relation() {
608 let original_id = event_id!("$original");
609 let related_id = event_id!("$related");
610 let room_id = room_id!("!galette:saucisse.bzh");
611 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
612
613 assert_relations(
614 room_id,
615 f.text_msg("Original event").event_id(original_id).into(),
616 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
617 f,
618 )
619 .await;
620 }
621
622 #[async_test]
623 async fn test_find_event_by_id_with_reaction_relation() {
624 let original_id = event_id!("$original");
625 let related_id = event_id!("$related");
626 let room_id = room_id!("!galette:saucisse.bzh");
627 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
628
629 assert_relations(
630 room_id,
631 f.text_msg("Original event").event_id(original_id).into(),
632 f.reaction(original_id, ":D").event_id(related_id).into(),
633 f,
634 )
635 .await;
636 }
637
638 #[async_test]
639 async fn test_find_event_by_id_with_poll_response_relation() {
640 let original_id = event_id!("$original");
641 let related_id = event_id!("$related");
642 let room_id = room_id!("!galette:saucisse.bzh");
643 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
644
645 assert_relations(
646 room_id,
647 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
648 .event_id(original_id)
649 .into(),
650 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
651 f,
652 )
653 .await;
654 }
655
656 #[async_test]
657 async fn test_find_event_by_id_with_poll_end_relation() {
658 let original_id = event_id!("$original");
659 let related_id = event_id!("$related");
660 let room_id = room_id!("!galette:saucisse.bzh");
661 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
662
663 assert_relations(
664 room_id,
665 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
666 .event_id(original_id)
667 .into(),
668 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
669 f,
670 )
671 .await;
672 }
673
674 #[async_test]
675 async fn test_find_event_by_id_with_filtered_relationships() {
676 let original_id = event_id!("$original");
677 let related_id = event_id!("$related");
678 let associated_related_id = event_id!("$recursive_related");
679 let room_id = room_id!("!galette:saucisse.bzh");
680 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
681
682 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
683 let related_event = event_factory
684 .text_msg("* Edited event")
685 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
686 .event_id(related_id)
687 .into();
688 let associated_related_event =
689 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
690
691 let client = logged_in_client(None).await;
692
693 let event_cache = client.event_cache();
694 event_cache.subscribe().unwrap();
695
696 client.base_client().get_or_create_room(room_id, RoomState::Joined);
697 let room = client.get_room(room_id).unwrap();
698
699 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
700
701 room_event_cache.save_events([original_event]).await;
703
704 room_event_cache.save_events([related_event]).await;
706
707 room_event_cache.save_events([associated_related_event]).await;
709
710 let filter = Some(vec![RelationType::Replacement]);
711 let (event, related_events) = room_event_cache
712 .find_event_with_relations(original_id, filter)
713 .await
714 .expect("Failed to find the event with relations")
715 .expect("Event has no relation");
716 let cached_event_id = event.event_id().unwrap();
718 assert_eq!(cached_event_id, original_id);
719
720 assert_eq!(related_events.len(), 1);
722
723 let related_event_id = related_events[0].event_id().unwrap();
724 assert_eq!(related_event_id, related_id);
725
726 let filter = Some(vec![RelationType::Thread]);
728 let (event, related_events) = room_event_cache
729 .find_event_with_relations(original_id, filter)
730 .await
731 .expect("Failed to find the event with relations")
732 .expect("Event has no relation");
733
734 let cached_event_id = event.event_id().unwrap();
736 assert_eq!(cached_event_id, original_id);
737 assert!(related_events.is_empty());
739 }
740
741 #[async_test]
742 async fn test_find_event_by_id_with_recursive_relation() {
743 let original_id = event_id!("$original");
744 let related_id = event_id!("$related");
745 let associated_related_id = event_id!("$recursive_related");
746 let room_id = room_id!("!galette:saucisse.bzh");
747 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
748
749 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
750 let related_event = event_factory
751 .text_msg("* Edited event")
752 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
753 .event_id(related_id)
754 .into();
755 let associated_related_event =
756 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
757
758 let client = logged_in_client(None).await;
759
760 let event_cache = client.event_cache();
761 event_cache.subscribe().unwrap();
762
763 client.base_client().get_or_create_room(room_id, RoomState::Joined);
764 let room = client.get_room(room_id).unwrap();
765
766 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
767
768 room_event_cache.save_events([original_event]).await;
770
771 room_event_cache.save_events([related_event]).await;
773
774 room_event_cache.save_events([associated_related_event]).await;
776
777 let (event, related_events) = room_event_cache
778 .find_event_with_relations(original_id, None)
779 .await
780 .expect("Failed to find the event with relations")
781 .expect("Event has no relation");
782 let cached_event_id = event.event_id().unwrap();
784 assert_eq!(cached_event_id, original_id);
785
786 assert_eq!(related_events.len(), 2);
788
789 let related_event_id = related_events[0].event_id().unwrap();
790 assert_eq!(related_event_id, related_id);
791 let related_event_id = related_events[1].event_id().unwrap();
792 assert_eq!(related_event_id, associated_related_id);
793 }
794
795 async fn assert_relations(
796 room_id: &RoomId,
797 original_event: Event,
798 related_event: Event,
799 event_factory: EventFactory,
800 ) {
801 let client = logged_in_client(None).await;
802
803 let event_cache = client.event_cache();
804 event_cache.subscribe().unwrap();
805
806 client.base_client().get_or_create_room(room_id, RoomState::Joined);
807 let room = client.get_room(room_id).unwrap();
808
809 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
810
811 let original_event_id = original_event.event_id().unwrap();
813 room_event_cache.save_events([original_event]).await;
814
815 let unrelated_id = event_id!("$2");
817 room_event_cache
818 .save_events([event_factory
819 .text_msg("An unrelated event")
820 .event_id(unrelated_id)
821 .into()])
822 .await;
823
824 let related_id = related_event.event_id().unwrap();
826 room_event_cache.save_events([related_event]).await;
827
828 let (event, related_events) = room_event_cache
829 .find_event_with_relations(&original_event_id, None)
830 .await
831 .expect("Failed to find the event with relations")
832 .expect("Event has no relation");
833 let cached_event_id = event.event_id().unwrap();
835 assert_eq!(cached_event_id, original_event_id);
836
837 let related_event_id = related_events[0].event_id().unwrap();
839 assert_eq!(related_event_id, related_id);
840 }
841}
842
843#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
845 use std::{ops::Not, sync::Arc};
846
847 use assert_matches::assert_matches;
848 use assert_matches2::assert_let;
849 use eyeball_im::VectorDiff;
850 use futures_util::FutureExt;
851 use matrix_sdk_base::{
852 RoomState,
853 event_cache::{
854 Gap,
855 store::{EventCacheStore as _, MemoryStore},
856 },
857 linked_chunk::{
858 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
859 lazy_loader::from_all_chunks,
860 },
861 store::StoreConfig,
862 sync::{JoinedRoomUpdate, Timeline},
863 };
864 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
865 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
866 use ruma::{
867 EventId, event_id,
868 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
869 room_id,
870 serde::Raw,
871 user_id,
872 };
873 use serde_json::json;
874 use tokio::task::yield_now;
875
876 use super::{
877 super::{
878 super::TimelineVectorDiffs, lock::Reload as _,
879 pagination::LoadMoreEventsBackwardsOutcome,
880 },
881 RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
882 };
883 use crate::{assert_let_timeout, test_utils::client::MockClientBuilder};
884
885 #[async_test]
886 async fn test_write_to_storage() {
887 let room_id = room_id!("!galette:saucisse.bzh");
888 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
889
890 let event_cache_store = Arc::new(MemoryStore::new());
891
892 let client = MockClientBuilder::new(None)
893 .on_builder(|builder| {
894 builder.store_config(
895 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
896 .event_cache_store(event_cache_store.clone()),
897 )
898 })
899 .build()
900 .await;
901
902 let event_cache = client.event_cache();
903
904 event_cache.subscribe().unwrap();
906
907 client.base_client().get_or_create_room(room_id, RoomState::Joined);
908 let room = client.get_room(room_id).unwrap();
909
910 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
911 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
912
913 let timeline = Timeline {
915 limited: true,
916 prev_batch: Some("raclette".to_owned()),
917 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
918 };
919
920 room_event_cache
921 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
922 .await
923 .unwrap();
924
925 assert_matches!(
927 generic_stream.recv().await,
928 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
929 assert_eq!(expected_room_id, room_id);
930 }
931 );
932 assert!(generic_stream.is_empty());
933
934 let linked_chunk = from_all_chunks::<3, _, _>(
936 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
937 )
938 .unwrap()
939 .unwrap();
940
941 assert_eq!(linked_chunk.chunks().count(), 2);
942
943 let mut chunks = linked_chunk.chunks();
944
945 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
947 assert_eq!(gap.token, "raclette");
948 });
949
950 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
952 assert_eq!(events.len(), 1);
953 let deserialized = events[0].raw().deserialize().unwrap();
954 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
955 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
956 });
957
958 assert!(chunks.next().is_none());
960 }
961
962 #[async_test]
963 async fn test_write_to_storage_strips_bundled_relations() {
964 let room_id = room_id!("!galette:saucisse.bzh");
965 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
966
967 let event_cache_store = Arc::new(MemoryStore::new());
968
969 let client = MockClientBuilder::new(None)
970 .on_builder(|builder| {
971 builder.store_config(
972 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
973 .event_cache_store(event_cache_store.clone()),
974 )
975 })
976 .build()
977 .await;
978
979 let event_cache = client.event_cache();
980
981 event_cache.subscribe().unwrap();
983
984 client.base_client().get_or_create_room(room_id, RoomState::Joined);
985 let room = client.get_room(room_id).unwrap();
986
987 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
988 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
989
990 let ev = f
992 .text_msg("hey yo")
993 .sender(*ALICE)
994 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
995 .into_event();
996
997 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
998
999 room_event_cache
1000 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1001 .await
1002 .unwrap();
1003
1004 assert_matches!(
1006 generic_stream.recv().await,
1007 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1008 assert_eq!(expected_room_id, room_id);
1009 }
1010 );
1011 assert!(generic_stream.is_empty());
1012
1013 {
1015 let events = room_event_cache.events().await.unwrap();
1016
1017 assert_eq!(events.len(), 1);
1018
1019 let ev = events[0].raw().deserialize().unwrap();
1020 assert_let!(
1021 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1022 );
1023
1024 let original = msg.as_original().unwrap();
1025 assert_eq!(original.content.body(), "hey yo");
1026 assert!(original.unsigned.relations.replace.is_some());
1027 }
1028
1029 let linked_chunk = from_all_chunks::<3, _, _>(
1031 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1032 )
1033 .unwrap()
1034 .unwrap();
1035
1036 assert_eq!(linked_chunk.chunks().count(), 1);
1037
1038 let mut chunks = linked_chunk.chunks();
1039 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1040 assert_eq!(events.len(), 1);
1041
1042 let ev = events[0].raw().deserialize().unwrap();
1043 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1044
1045 let original = msg.as_original().unwrap();
1046 assert_eq!(original.content.body(), "hey yo");
1047 assert!(original.unsigned.relations.replace.is_none());
1048 });
1049
1050 assert!(chunks.next().is_none());
1052 }
1053
1054 #[async_test]
1055 async fn test_clear() {
1056 let room_id = room_id!("!galette:saucisse.bzh");
1057 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1058
1059 let event_cache_store = Arc::new(MemoryStore::new());
1060
1061 let event_id1 = event_id!("$1");
1062 let event_id2 = event_id!("$2");
1063
1064 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1065 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1066
1067 event_cache_store
1069 .handle_linked_chunk_updates(
1070 LinkedChunkId::Room(room_id),
1071 vec![
1072 Update::NewItemsChunk {
1074 previous: None,
1075 new: ChunkIdentifier::new(0),
1076 next: None,
1077 },
1078 Update::NewGapChunk {
1080 previous: Some(ChunkIdentifier::new(0)),
1081 new: ChunkIdentifier::new(42),
1083 next: None,
1084 gap: Gap { token: "comté".to_owned() },
1085 },
1086 Update::NewItemsChunk {
1088 previous: Some(ChunkIdentifier::new(42)),
1089 new: ChunkIdentifier::new(1),
1090 next: None,
1091 },
1092 Update::PushItems {
1093 at: Position::new(ChunkIdentifier::new(1), 0),
1094 items: vec![ev1.clone()],
1095 },
1096 Update::NewItemsChunk {
1098 previous: Some(ChunkIdentifier::new(1)),
1099 new: ChunkIdentifier::new(2),
1100 next: None,
1101 },
1102 Update::PushItems {
1103 at: Position::new(ChunkIdentifier::new(2), 0),
1104 items: vec![ev2.clone()],
1105 },
1106 ],
1107 )
1108 .await
1109 .unwrap();
1110
1111 let client = MockClientBuilder::new(None)
1112 .on_builder(|builder| {
1113 builder.store_config(
1114 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1115 .event_cache_store(event_cache_store.clone()),
1116 )
1117 })
1118 .build()
1119 .await;
1120
1121 let event_cache = client.event_cache();
1122
1123 event_cache.subscribe().unwrap();
1125
1126 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1127 let room = client.get_room(room_id).unwrap();
1128
1129 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1130
1131 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1132 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1133
1134 {
1136 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1137 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1138 }
1139
1140 {
1142 assert_eq!(items.len(), 1);
1144 assert_eq!(items[0].event_id().unwrap(), event_id2);
1145
1146 assert!(stream.is_empty());
1147 }
1148
1149 {
1151 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1152
1153 assert_let_timeout!(
1154 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1155 stream.recv()
1156 );
1157 assert_eq!(diffs.len(), 1);
1158 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1159 assert_eq!(event.event_id().unwrap(), event_id1);
1161 });
1162
1163 assert!(stream.is_empty());
1164
1165 assert_let_timeout!(
1166 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1167 generic_stream.recv()
1168 );
1169 assert_eq!(room_id, expected_room_id);
1170 assert!(generic_stream.is_empty());
1171 }
1172
1173 room_event_cache.clear().await.unwrap();
1175
1176 assert_let_timeout!(
1178 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1179 stream.recv()
1180 );
1181 assert_eq!(diffs.len(), 1);
1182 assert_let!(VectorDiff::Clear = &diffs[0]);
1183
1184 assert_let_timeout!(
1186 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1187 );
1188 assert_eq!(received_room_id, room_id);
1189 assert!(generic_stream.is_empty());
1190
1191 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1194
1195 let items = room_event_cache.events().await.unwrap();
1197 assert!(items.is_empty());
1198
1199 let linked_chunk = from_all_chunks::<3, _, _>(
1201 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1202 )
1203 .unwrap()
1204 .unwrap();
1205
1206 assert_eq!(linked_chunk.num_items(), 0);
1210 }
1211
1212 #[async_test]
1213 async fn test_load_from_storage() {
1214 let room_id = room_id!("!galette:saucisse.bzh");
1215 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1216
1217 let event_cache_store = Arc::new(MemoryStore::new());
1218
1219 let event_id1 = event_id!("$1");
1220 let event_id2 = event_id!("$2");
1221
1222 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1223 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1224
1225 event_cache_store
1227 .handle_linked_chunk_updates(
1228 LinkedChunkId::Room(room_id),
1229 vec![
1230 Update::NewItemsChunk {
1232 previous: None,
1233 new: ChunkIdentifier::new(0),
1234 next: None,
1235 },
1236 Update::NewGapChunk {
1238 previous: Some(ChunkIdentifier::new(0)),
1239 new: ChunkIdentifier::new(42),
1241 next: None,
1242 gap: Gap { token: "cheddar".to_owned() },
1243 },
1244 Update::NewItemsChunk {
1246 previous: Some(ChunkIdentifier::new(42)),
1247 new: ChunkIdentifier::new(1),
1248 next: None,
1249 },
1250 Update::PushItems {
1251 at: Position::new(ChunkIdentifier::new(1), 0),
1252 items: vec![ev1.clone()],
1253 },
1254 Update::NewItemsChunk {
1256 previous: Some(ChunkIdentifier::new(1)),
1257 new: ChunkIdentifier::new(2),
1258 next: None,
1259 },
1260 Update::PushItems {
1261 at: Position::new(ChunkIdentifier::new(2), 0),
1262 items: vec![ev2.clone()],
1263 },
1264 ],
1265 )
1266 .await
1267 .unwrap();
1268
1269 let client = MockClientBuilder::new(None)
1270 .on_builder(|builder| {
1271 builder.store_config(
1272 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1273 .event_cache_store(event_cache_store.clone()),
1274 )
1275 })
1276 .build()
1277 .await;
1278
1279 let event_cache = client.event_cache();
1280
1281 event_cache.subscribe().unwrap();
1283
1284 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1286
1287 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1288 let room = client.get_room(room_id).unwrap();
1289
1290 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1291
1292 assert_matches!(
1295 generic_stream.recv().await,
1296 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1297 assert_eq!(room_id, expected_room_id);
1298 }
1299 );
1300 assert!(generic_stream.is_empty());
1301
1302 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1303
1304 assert_eq!(items.len(), 1);
1307 assert_eq!(items[0].event_id().unwrap(), event_id2);
1308 assert!(stream.is_empty());
1309
1310 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1312 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1313
1314 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1316
1317 assert_let_timeout!(
1318 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1319 stream.recv()
1320 );
1321 assert_eq!(diffs.len(), 1);
1322 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1323 assert_eq!(event.event_id().unwrap(), event_id1);
1324 });
1325
1326 assert!(stream.is_empty());
1327
1328 assert_matches!(
1330 generic_stream.recv().await,
1331 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1332 assert_eq!(expected_room_id, room_id);
1333 }
1334 );
1335 assert!(generic_stream.is_empty());
1336
1337 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1339
1340 room_event_cache
1341 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1342 .await
1343 .unwrap();
1344
1345 assert!(generic_stream.recv().now_or_never().is_none());
1348
1349 let items = room_event_cache.events().await.unwrap();
1354 assert_eq!(items.len(), 2);
1355 assert_eq!(items[0].event_id().unwrap(), event_id1);
1356 assert_eq!(items[1].event_id().unwrap(), event_id2);
1357 }
1358
1359 #[async_test]
1360 async fn test_load_from_storage_resilient_to_failure() {
1361 let room_id = room_id!("!fondue:patate.ch");
1362 let event_cache_store = Arc::new(MemoryStore::new());
1363
1364 let event = EventFactory::new()
1365 .room(room_id)
1366 .sender(user_id!("@ben:saucisse.bzh"))
1367 .text_msg("foo")
1368 .event_id(event_id!("$42"))
1369 .into_event();
1370
1371 event_cache_store
1373 .handle_linked_chunk_updates(
1374 LinkedChunkId::Room(room_id),
1375 vec![
1376 Update::NewItemsChunk {
1377 previous: None,
1378 new: ChunkIdentifier::new(0),
1379 next: None,
1380 },
1381 Update::PushItems {
1382 at: Position::new(ChunkIdentifier::new(0), 0),
1383 items: vec![event],
1384 },
1385 Update::NewItemsChunk {
1386 previous: Some(ChunkIdentifier::new(0)),
1387 new: ChunkIdentifier::new(1),
1388 next: Some(ChunkIdentifier::new(0)),
1389 },
1390 ],
1391 )
1392 .await
1393 .unwrap();
1394
1395 let client = MockClientBuilder::new(None)
1396 .on_builder(|builder| {
1397 builder.store_config(
1398 StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1399 .event_cache_store(event_cache_store.clone()),
1400 )
1401 })
1402 .build()
1403 .await;
1404
1405 let event_cache = client.event_cache();
1406
1407 event_cache.subscribe().unwrap();
1409
1410 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1411 let room = client.get_room(room_id).unwrap();
1412
1413 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1414
1415 let items = room_event_cache.events().await.unwrap();
1416
1417 assert!(items.is_empty());
1420
1421 let raw_chunks =
1424 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1425 assert!(raw_chunks.is_empty());
1426 }
1427
1428 #[async_test]
1429 async fn test_no_useless_gaps() {
1430 let room_id = room_id!("!galette:saucisse.bzh");
1431
1432 let client = MockClientBuilder::new(None).build().await;
1433
1434 let event_cache = client.event_cache();
1435 event_cache.subscribe().unwrap();
1436
1437 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1438 let room = client.get_room(room_id).unwrap();
1439 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1440 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1441
1442 let f = EventFactory::new().room(room_id).sender(*ALICE);
1443
1444 room_event_cache
1447 .handle_joined_room_update(JoinedRoomUpdate {
1448 timeline: Timeline {
1449 limited: true,
1450 prev_batch: Some("raclette".to_owned()),
1451 events: vec![f.text_msg("hey yo").into_event()],
1452 },
1453 ..Default::default()
1454 })
1455 .await
1456 .unwrap();
1457
1458 assert_matches!(
1460 generic_stream.recv().await,
1461 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1462 assert_eq!(expected_room_id, room_id);
1463 }
1464 );
1465 assert!(generic_stream.is_empty());
1466
1467 {
1468 let state = room_event_cache.inner.state.read().await.unwrap();
1469
1470 let mut num_gaps = 0;
1471 let mut num_events = 0;
1472
1473 for c in state.room_linked_chunk().chunks() {
1474 match c.content() {
1475 ChunkContent::Items(items) => num_events += items.len(),
1476 ChunkContent::Gap(_) => num_gaps += 1,
1477 }
1478 }
1479
1480 assert_eq!(num_gaps, 0);
1483 assert_eq!(num_events, 1);
1484 }
1485
1486 assert_matches!(
1488 room_event_cache.pagination().load_more_events_backwards().await.unwrap(),
1489 LoadMoreEventsBackwardsOutcome::Gap { .. }
1490 );
1491
1492 {
1493 let state = room_event_cache.inner.state.read().await.unwrap();
1494
1495 let mut num_gaps = 0;
1496 let mut num_events = 0;
1497
1498 for c in state.room_linked_chunk().chunks() {
1499 match c.content() {
1500 ChunkContent::Items(items) => num_events += items.len(),
1501 ChunkContent::Gap(_) => num_gaps += 1,
1502 }
1503 }
1504
1505 assert_eq!(num_gaps, 1);
1507 assert_eq!(num_events, 1);
1508 }
1509
1510 room_event_cache
1513 .handle_joined_room_update(JoinedRoomUpdate {
1514 timeline: Timeline {
1515 limited: false,
1516 prev_batch: Some("fondue".to_owned()),
1517 events: vec![f.text_msg("sup").into_event()],
1518 },
1519 ..Default::default()
1520 })
1521 .await
1522 .unwrap();
1523
1524 assert_matches!(
1526 generic_stream.recv().await,
1527 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1528 assert_eq!(expected_room_id, room_id);
1529 }
1530 );
1531 assert!(generic_stream.is_empty());
1532
1533 {
1534 let state = room_event_cache.inner.state.read().await.unwrap();
1535
1536 let mut num_gaps = 0;
1537 let mut num_events = 0;
1538
1539 for c in state.room_linked_chunk().chunks() {
1540 match c.content() {
1541 ChunkContent::Items(items) => num_events += items.len(),
1542 ChunkContent::Gap(gap) => {
1543 assert_eq!(gap.token, "raclette");
1544 num_gaps += 1;
1545 }
1546 }
1547 }
1548
1549 assert_eq!(num_gaps, 1);
1551 assert_eq!(num_events, 2);
1552 }
1553 }
1554
1555 #[async_test]
1556 async fn test_shrink_to_last_chunk() {
1557 let room_id = room_id!("!galette:saucisse.bzh");
1558
1559 let client = MockClientBuilder::new(None).build().await;
1560
1561 let f = EventFactory::new().room(room_id);
1562
1563 let evid1 = event_id!("$1");
1564 let evid2 = event_id!("$2");
1565
1566 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1567 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1568
1569 {
1571 client
1572 .event_cache_store()
1573 .lock()
1574 .await
1575 .expect("Could not acquire the event cache lock")
1576 .as_clean()
1577 .expect("Could not acquire a clean event cache lock")
1578 .handle_linked_chunk_updates(
1579 LinkedChunkId::Room(room_id),
1580 vec![
1581 Update::NewItemsChunk {
1582 previous: None,
1583 new: ChunkIdentifier::new(0),
1584 next: None,
1585 },
1586 Update::PushItems {
1587 at: Position::new(ChunkIdentifier::new(0), 0),
1588 items: vec![ev1],
1589 },
1590 Update::NewItemsChunk {
1591 previous: Some(ChunkIdentifier::new(0)),
1592 new: ChunkIdentifier::new(1),
1593 next: None,
1594 },
1595 Update::PushItems {
1596 at: Position::new(ChunkIdentifier::new(1), 0),
1597 items: vec![ev2],
1598 },
1599 ],
1600 )
1601 .await
1602 .unwrap();
1603 }
1604
1605 let event_cache = client.event_cache();
1606 event_cache.subscribe().unwrap();
1607
1608 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1609 let room = client.get_room(room_id).unwrap();
1610 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1611
1612 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1614 assert_eq!(events.len(), 1);
1615 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1616 assert!(stream.is_empty());
1617
1618 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1619
1620 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1622 assert_eq!(outcome.events.len(), 1);
1623 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1624 assert!(outcome.reached_start);
1625
1626 assert_let_timeout!(
1628 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1629 stream.recv()
1630 );
1631 assert_eq!(diffs.len(), 1);
1632 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1633 assert_eq!(value.event_id().as_deref(), Some(evid1));
1634 });
1635
1636 assert!(stream.is_empty());
1637
1638 assert_let_timeout!(
1640 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1641 );
1642 assert_eq!(expected_room_id, room_id);
1643 assert!(generic_stream.is_empty());
1644
1645 room_event_cache
1647 .inner
1648 .state
1649 .write()
1650 .await
1651 .unwrap()
1652 .reload()
1653 .await
1654 .expect("shrinking should succeed");
1655
1656 assert_let_timeout!(
1658 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1659 stream.recv()
1660 );
1661 assert_eq!(diffs.len(), 2);
1662 assert_matches!(&diffs[0], VectorDiff::Clear);
1663 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1664 assert_eq!(values.len(), 1);
1665 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1666 });
1667
1668 assert!(stream.is_empty());
1669
1670 assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1672 assert!(generic_stream.is_empty());
1673
1674 let events = room_event_cache.events().await.unwrap();
1676 assert_eq!(events.len(), 1);
1677 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1678
1679 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1682 assert_eq!(outcome.events.len(), 1);
1683 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1684 assert!(outcome.reached_start);
1685 }
1686
1687 #[async_test]
1688 async fn test_room_ordering() {
1689 let room_id = room_id!("!galette:saucisse.bzh");
1690
1691 let client = MockClientBuilder::new(None).build().await;
1692
1693 let f = EventFactory::new().room(room_id).sender(*ALICE);
1694
1695 let evid1 = event_id!("$1");
1696 let evid2 = event_id!("$2");
1697 let evid3 = event_id!("$3");
1698
1699 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1700 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1701 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1702
1703 {
1705 client
1706 .event_cache_store()
1707 .lock()
1708 .await
1709 .expect("Could not acquire the event cache lock")
1710 .as_clean()
1711 .expect("Could not acquire a clean event cache lock")
1712 .handle_linked_chunk_updates(
1713 LinkedChunkId::Room(room_id),
1714 vec![
1715 Update::NewItemsChunk {
1716 previous: None,
1717 new: ChunkIdentifier::new(0),
1718 next: None,
1719 },
1720 Update::PushItems {
1721 at: Position::new(ChunkIdentifier::new(0), 0),
1722 items: vec![ev1, ev2],
1723 },
1724 Update::NewItemsChunk {
1725 previous: Some(ChunkIdentifier::new(0)),
1726 new: ChunkIdentifier::new(1),
1727 next: None,
1728 },
1729 Update::PushItems {
1730 at: Position::new(ChunkIdentifier::new(1), 0),
1731 items: vec![ev3.clone()],
1732 },
1733 ],
1734 )
1735 .await
1736 .unwrap();
1737 }
1738
1739 let event_cache = client.event_cache();
1740 event_cache.subscribe().unwrap();
1741
1742 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1743 let room = client.get_room(room_id).unwrap();
1744 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1745
1746 {
1749 let state = room_event_cache.inner.state.read().await.unwrap();
1750 let room_linked_chunk = state.room_linked_chunk();
1751
1752 assert_eq!(
1754 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1755 Some(0)
1756 );
1757
1758 assert_eq!(
1760 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1761 Some(1)
1762 );
1763
1764 let mut events = room_linked_chunk.events();
1766 let (pos, ev) = events.next().unwrap();
1767 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1768 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1769 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1770
1771 assert!(events.next().is_none());
1773 }
1774
1775 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1777 assert!(outcome.reached_start);
1778
1779 {
1782 let state = room_event_cache.inner.state.read().await.unwrap();
1783 let room_linked_chunk = state.room_linked_chunk();
1784
1785 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1786 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1787 }
1788 }
1789
1790 let evid4 = event_id!("$4");
1795 room_event_cache
1796 .handle_joined_room_update(JoinedRoomUpdate {
1797 timeline: Timeline {
1798 limited: true,
1799 prev_batch: Some("fondue".to_owned()),
1800 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1801 },
1802 ..Default::default()
1803 })
1804 .await
1805 .unwrap();
1806
1807 {
1808 let state = room_event_cache.inner.state.read().await.unwrap();
1809 let room_linked_chunk = state.room_linked_chunk();
1810
1811 let mut events = room_linked_chunk.events();
1813
1814 let (pos, ev) = events.next().unwrap();
1815 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1816 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1817
1818 let (pos, ev) = events.next().unwrap();
1819 assert_eq!(ev.event_id().as_deref(), Some(evid4));
1820 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1821
1822 assert!(events.next().is_none());
1824
1825 assert_eq!(
1827 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1828 Some(0)
1829 );
1830 assert_eq!(
1831 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1832 Some(1)
1833 );
1834
1835 assert_eq!(
1838 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1839 None
1840 );
1841 }
1842 }
1843
1844 #[async_test]
1845 async fn test_auto_shrink_after_all_subscribers_are_gone() {
1846 let room_id = room_id!("!galette:saucisse.bzh");
1847
1848 let client = MockClientBuilder::new(None).build().await;
1849
1850 let f = EventFactory::new().room(room_id);
1851
1852 let evid1 = event_id!("$1");
1853 let evid2 = event_id!("$2");
1854
1855 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1856 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1857
1858 {
1860 client
1861 .event_cache_store()
1862 .lock()
1863 .await
1864 .expect("Could not acquire the event cache lock")
1865 .as_clean()
1866 .expect("Could not acquire a clean event cache lock")
1867 .handle_linked_chunk_updates(
1868 LinkedChunkId::Room(room_id),
1869 vec![
1870 Update::NewItemsChunk {
1871 previous: None,
1872 new: ChunkIdentifier::new(0),
1873 next: None,
1874 },
1875 Update::PushItems {
1876 at: Position::new(ChunkIdentifier::new(0), 0),
1877 items: vec![ev1],
1878 },
1879 Update::NewItemsChunk {
1880 previous: Some(ChunkIdentifier::new(0)),
1881 new: ChunkIdentifier::new(1),
1882 next: None,
1883 },
1884 Update::PushItems {
1885 at: Position::new(ChunkIdentifier::new(1), 0),
1886 items: vec![ev2],
1887 },
1888 ],
1889 )
1890 .await
1891 .unwrap();
1892 }
1893
1894 let event_cache = client.event_cache();
1895 event_cache.subscribe().unwrap();
1896
1897 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1898 let room = client.get_room(room_id).unwrap();
1899 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1900
1901 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1903 assert_eq!(events1.len(), 1);
1904 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1905 assert!(stream1.is_empty());
1906
1907 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1908
1909 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1911 assert_eq!(outcome.events.len(), 1);
1912 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1913 assert!(outcome.reached_start);
1914
1915 assert_let_timeout!(
1918 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1919 stream1.recv()
1920 );
1921 assert_eq!(diffs.len(), 1);
1922 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1923 assert_eq!(value.event_id().as_deref(), Some(evid1));
1924 });
1925
1926 assert!(stream1.is_empty());
1927
1928 assert_let_timeout!(
1929 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1930 );
1931 assert_eq!(expected_room_id, room_id);
1932 assert!(generic_stream.is_empty());
1933
1934 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1938 assert_eq!(events2.len(), 2);
1939 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1940 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1941 assert!(stream2.is_empty());
1942
1943 drop(stream1);
1945 yield_now().await;
1946
1947 assert!(stream2.is_empty());
1949
1950 drop(stream2);
1952 yield_now().await;
1953
1954 {
1957 let state = room_event_cache.inner.state.read().await.unwrap();
1959 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
1960 }
1961
1962 let events3 = room_event_cache.events().await.unwrap();
1964 assert_eq!(events3.len(), 1);
1965 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
1966 }
1967
1968 #[async_test]
1969 async fn test_rfind_map_event_in_memory_by() {
1970 let user_id = user_id!("@mnt_io:matrix.org");
1971 let room_id = room_id!("!raclette:patate.ch");
1972 let client = MockClientBuilder::new(None).build().await;
1973
1974 let event_factory = EventFactory::new().room(room_id);
1975
1976 let event_id_0 = event_id!("$ev0");
1977 let event_id_1 = event_id!("$ev1");
1978 let event_id_2 = event_id!("$ev2");
1979 let event_id_3 = event_id!("$ev3");
1980
1981 let event_0 =
1982 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
1983 let event_1 =
1984 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
1985 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
1986 let event_3 =
1987 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
1988
1989 {
1992 client
1993 .event_cache_store()
1994 .lock()
1995 .await
1996 .expect("Could not acquire the event cache lock")
1997 .as_clean()
1998 .expect("Could not acquire a clean event cache lock")
1999 .handle_linked_chunk_updates(
2000 LinkedChunkId::Room(room_id),
2001 vec![
2002 Update::NewItemsChunk {
2003 previous: None,
2004 new: ChunkIdentifier::new(0),
2005 next: None,
2006 },
2007 Update::PushItems {
2008 at: Position::new(ChunkIdentifier::new(0), 0),
2009 items: vec![event_3],
2010 },
2011 Update::NewItemsChunk {
2012 previous: Some(ChunkIdentifier::new(0)),
2013 new: ChunkIdentifier::new(1),
2014 next: None,
2015 },
2016 Update::PushItems {
2017 at: Position::new(ChunkIdentifier::new(1), 0),
2018 items: vec![event_0, event_1, event_2],
2019 },
2020 ],
2021 )
2022 .await
2023 .unwrap();
2024 }
2025
2026 let event_cache = client.event_cache();
2027 event_cache.subscribe().unwrap();
2028
2029 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2030 let room = client.get_room(room_id).unwrap();
2031 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2032
2033 assert_matches!(
2035 room_event_cache
2036 .rfind_map_event_in_memory_by(|event| {
2037 (event.sender().as_deref() == Some(*BOB)).then(|| event.event_id())
2038 })
2039 .await,
2040 Ok(Some(event_id)) => {
2041 assert_eq!(event_id.as_deref(), Some(event_id_0));
2042 }
2043 );
2044
2045 assert_matches!(
2048 room_event_cache
2049 .rfind_map_event_in_memory_by(|event| {
2050 (event.sender().as_deref() == Some(*ALICE)).then(|| event.event_id())
2051 })
2052 .await,
2053 Ok(Some(event_id)) => {
2054 assert_eq!(event_id.as_deref(), Some(event_id_2));
2055 }
2056 );
2057
2058 assert!(
2060 room_event_cache
2061 .rfind_map_event_in_memory_by(|event| {
2062 (event.sender().as_deref() == Some(user_id)).then(|| event.event_id())
2063 })
2064 .await
2065 .unwrap()
2066 .is_none()
2067 );
2068
2069 assert!(
2071 room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2072 );
2073 }
2074
2075 #[async_test]
2076 async fn test_reload_when_dirty() {
2077 let user_id = user_id!("@mnt_io:matrix.org");
2078 let room_id = room_id!("!raclette:patate.ch");
2079
2080 let event_cache_store = MemoryStore::new();
2082
2083 let client_p0 = MockClientBuilder::new(None)
2085 .on_builder(|builder| {
2086 builder.store_config(
2087 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2088 .event_cache_store(event_cache_store.clone()),
2089 )
2090 })
2091 .build()
2092 .await;
2093
2094 let client_p1 = MockClientBuilder::new(None)
2096 .on_builder(|builder| {
2097 builder.store_config(
2098 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2099 .event_cache_store(event_cache_store),
2100 )
2101 })
2102 .build()
2103 .await;
2104
2105 let event_factory = EventFactory::new().room(room_id).sender(user_id);
2106
2107 let ev_id_0 = event_id!("$ev_0");
2108 let ev_id_1 = event_id!("$ev_1");
2109
2110 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2111 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2112
2113 client_p0
2115 .event_cache_store()
2116 .lock()
2117 .await
2118 .expect("[p0] Could not acquire the event cache lock")
2119 .as_clean()
2120 .expect("[p0] Could not acquire a clean event cache lock")
2121 .handle_linked_chunk_updates(
2122 LinkedChunkId::Room(room_id),
2123 vec![
2124 Update::NewItemsChunk {
2125 previous: None,
2126 new: ChunkIdentifier::new(0),
2127 next: None,
2128 },
2129 Update::PushItems {
2130 at: Position::new(ChunkIdentifier::new(0), 0),
2131 items: vec![ev_0],
2132 },
2133 Update::NewItemsChunk {
2134 previous: Some(ChunkIdentifier::new(0)),
2135 new: ChunkIdentifier::new(1),
2136 next: None,
2137 },
2138 Update::PushItems {
2139 at: Position::new(ChunkIdentifier::new(1), 0),
2140 items: vec![ev_1],
2141 },
2142 ],
2143 )
2144 .await
2145 .unwrap();
2146
2147 let (room_event_cache_p0, room_event_cache_p1) = {
2149 let event_cache_p0 = client_p0.event_cache();
2150 event_cache_p0.subscribe().unwrap();
2151
2152 let event_cache_p1 = client_p1.event_cache();
2153 event_cache_p1.subscribe().unwrap();
2154
2155 client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
2156 client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
2157
2158 let (room_event_cache_p0, _drop_handles) =
2159 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2160 let (room_event_cache_p1, _drop_handles) =
2161 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2162
2163 (room_event_cache_p0, room_event_cache_p1)
2164 };
2165
2166 let mut updates_stream_p0 = {
2171 let room_event_cache = &room_event_cache_p0;
2172
2173 let (initial_updates, mut updates_stream) =
2174 room_event_cache_p0.subscribe().await.unwrap();
2175
2176 assert_eq!(initial_updates.len(), 1);
2178 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2179 assert!(updates_stream.is_empty());
2180
2181 assert!(event_loaded(room_event_cache, ev_id_1).await);
2183
2184 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2186
2187 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2189
2190 assert_matches!(
2192 updates_stream.recv().await.unwrap(),
2193 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2194 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2195 assert_matches!(
2196 &diffs[0],
2197 VectorDiff::Insert { index: 0, value: event } => {
2198 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2199 }
2200 );
2201 }
2202 );
2203
2204 assert!(event_loaded(room_event_cache, ev_id_0).await);
2206
2207 updates_stream
2208 };
2209
2210 let mut updates_stream_p1 = {
2212 let room_event_cache = &room_event_cache_p1;
2213 let (initial_updates, mut updates_stream) =
2214 room_event_cache_p1.subscribe().await.unwrap();
2215
2216 assert_eq!(initial_updates.len(), 1);
2218 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2219 assert!(updates_stream.is_empty());
2220
2221 assert!(event_loaded(room_event_cache, ev_id_1).await);
2223
2224 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2226
2227 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2229
2230 assert_matches!(
2232 updates_stream.recv().await.unwrap(),
2233 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2234 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2235 assert_matches!(
2236 &diffs[0],
2237 VectorDiff::Insert { index: 0, value: event } => {
2238 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2239 }
2240 );
2241 }
2242 );
2243
2244 assert!(event_loaded(room_event_cache, ev_id_0).await);
2246
2247 updates_stream
2248 };
2249
2250 for _ in 0..3 {
2252 {
2256 let room_event_cache = &room_event_cache_p0;
2257 let updates_stream = &mut updates_stream_p0;
2258
2259 assert!(event_loaded(room_event_cache, ev_id_1).await);
2261
2262 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2265
2266 assert_matches!(
2268 updates_stream.recv().await.unwrap(),
2269 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2270 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2271 assert_matches!(&diffs[0], VectorDiff::Clear);
2272 assert_matches!(
2273 &diffs[1],
2274 VectorDiff::Append { values: events } => {
2275 assert_eq!(events.len(), 1);
2276 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2277 }
2278 );
2279 }
2280 );
2281
2282 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2284
2285 assert!(event_loaded(room_event_cache, ev_id_0).await);
2287
2288 assert_matches!(
2290 updates_stream.recv().await.unwrap(),
2291 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2292 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2293 assert_matches!(
2294 &diffs[0],
2295 VectorDiff::Insert { index: 0, value: event } => {
2296 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2297 }
2298 );
2299 }
2300 );
2301 }
2302
2303 {
2307 let room_event_cache = &room_event_cache_p1;
2308 let updates_stream = &mut updates_stream_p1;
2309
2310 assert!(event_loaded(room_event_cache, ev_id_1).await);
2312
2313 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2316
2317 assert_matches!(
2319 updates_stream.recv().await.unwrap(),
2320 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2321 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2322 assert_matches!(&diffs[0], VectorDiff::Clear);
2323 assert_matches!(
2324 &diffs[1],
2325 VectorDiff::Append { values: events } => {
2326 assert_eq!(events.len(), 1);
2327 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2328 }
2329 );
2330 }
2331 );
2332
2333 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2335
2336 assert!(event_loaded(room_event_cache, ev_id_0).await);
2338
2339 assert_matches!(
2341 updates_stream.recv().await.unwrap(),
2342 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2343 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2344 assert_matches!(
2345 &diffs[0],
2346 VectorDiff::Insert { index: 0, value: event } => {
2347 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2348 }
2349 );
2350 }
2351 );
2352 }
2353 }
2354
2355 for _ in 0..3 {
2358 {
2359 let room_event_cache = &room_event_cache_p0;
2360 let updates_stream = &mut updates_stream_p0;
2361
2362 let guard = room_event_cache.inner.state.read().await.unwrap();
2363
2364 assert!(guard.is_dirty().not());
2370
2371 assert_matches!(
2373 updates_stream.recv().await.unwrap(),
2374 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2375 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2376 assert_matches!(&diffs[0], VectorDiff::Clear);
2377 assert_matches!(
2378 &diffs[1],
2379 VectorDiff::Append { values: events } => {
2380 assert_eq!(events.len(), 1);
2381 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2382 }
2383 );
2384 }
2385 );
2386
2387 assert!(event_loaded(room_event_cache, ev_id_1).await);
2388 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2389
2390 drop(guard);
2396
2397 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2398 assert!(event_loaded(room_event_cache, ev_id_0).await);
2399
2400 assert_matches!(
2402 updates_stream.recv().await.unwrap(),
2403 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2404 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2405 assert_matches!(
2406 &diffs[0],
2407 VectorDiff::Insert { index: 0, value: event } => {
2408 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2409 }
2410 );
2411 }
2412 );
2413 }
2414
2415 {
2416 let room_event_cache = &room_event_cache_p1;
2417 let updates_stream = &mut updates_stream_p1;
2418
2419 let guard = room_event_cache.inner.state.read().await.unwrap();
2420
2421 assert!(guard.is_dirty().not());
2426
2427 assert_matches!(
2429 updates_stream.recv().await.unwrap(),
2430 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2431 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2432 assert_matches!(&diffs[0], VectorDiff::Clear);
2433 assert_matches!(
2434 &diffs[1],
2435 VectorDiff::Append { values: events } => {
2436 assert_eq!(events.len(), 1);
2437 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2438 }
2439 );
2440 }
2441 );
2442
2443 assert!(event_loaded(room_event_cache, ev_id_1).await);
2444 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2445
2446 drop(guard);
2452
2453 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2454 assert!(event_loaded(room_event_cache, ev_id_0).await);
2455
2456 assert_matches!(
2458 updates_stream.recv().await.unwrap(),
2459 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2460 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2461 assert_matches!(
2462 &diffs[0],
2463 VectorDiff::Insert { index: 0, value: event } => {
2464 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2465 }
2466 );
2467 }
2468 );
2469 }
2470 }
2471
2472 for _ in 0..3 {
2474 {
2475 let room_event_cache = &room_event_cache_p0;
2476 let updates_stream = &mut updates_stream_p0;
2477
2478 let guard = room_event_cache.inner.state.write().await.unwrap();
2479
2480 assert!(guard.is_dirty().not());
2482
2483 assert_matches!(
2485 updates_stream.recv().await.unwrap(),
2486 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2487 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2488 assert_matches!(&diffs[0], VectorDiff::Clear);
2489 assert_matches!(
2490 &diffs[1],
2491 VectorDiff::Append { values: events } => {
2492 assert_eq!(events.len(), 1);
2493 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2494 }
2495 );
2496 }
2497 );
2498
2499 drop(guard);
2502
2503 assert!(event_loaded(room_event_cache, ev_id_1).await);
2504 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2505
2506 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2507 assert!(event_loaded(room_event_cache, ev_id_0).await);
2508
2509 assert_matches!(
2511 updates_stream.recv().await.unwrap(),
2512 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2513 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2514 assert_matches!(
2515 &diffs[0],
2516 VectorDiff::Insert { index: 0, value: event } => {
2517 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2518 }
2519 );
2520 }
2521 );
2522 }
2523
2524 {
2525 let room_event_cache = &room_event_cache_p1;
2526 let updates_stream = &mut updates_stream_p1;
2527
2528 let guard = room_event_cache.inner.state.write().await.unwrap();
2529
2530 assert!(guard.is_dirty().not());
2532
2533 assert_matches!(
2535 updates_stream.recv().await.unwrap(),
2536 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2537 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2538 assert_matches!(&diffs[0], VectorDiff::Clear);
2539 assert_matches!(
2540 &diffs[1],
2541 VectorDiff::Append { values: events } => {
2542 assert_eq!(events.len(), 1);
2543 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2544 }
2545 );
2546 }
2547 );
2548
2549 drop(guard);
2552
2553 assert!(event_loaded(room_event_cache, ev_id_1).await);
2554 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2555
2556 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2557 assert!(event_loaded(room_event_cache, ev_id_0).await);
2558
2559 assert_matches!(
2561 updates_stream.recv().await.unwrap(),
2562 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2563 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2564 assert_matches!(
2565 &diffs[0],
2566 VectorDiff::Insert { index: 0, value: event } => {
2567 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2568 }
2569 );
2570 }
2571 );
2572 }
2573 }
2574 }
2575
2576 #[async_test]
2577 async fn test_load_when_dirty() {
2578 let room_id_0 = room_id!("!raclette:patate.ch");
2579 let room_id_1 = room_id!("!morbiflette:patate.ch");
2580
2581 let event_cache_store = MemoryStore::new();
2583
2584 let client_p0 = MockClientBuilder::new(None)
2586 .on_builder(|builder| {
2587 builder.store_config(
2588 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2589 .event_cache_store(event_cache_store.clone()),
2590 )
2591 })
2592 .build()
2593 .await;
2594
2595 let client_p1 = MockClientBuilder::new(None)
2597 .on_builder(|builder| {
2598 builder.store_config(
2599 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2600 .event_cache_store(event_cache_store),
2601 )
2602 })
2603 .build()
2604 .await;
2605
2606 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2608 let event_cache_p0 = client_p0.event_cache();
2609 event_cache_p0.subscribe().unwrap();
2610
2611 let event_cache_p1 = client_p1.event_cache();
2612 event_cache_p1.subscribe().unwrap();
2613
2614 client_p0.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2615 client_p0.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2616
2617 client_p1.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2618 client_p1.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2619
2620 let (room_event_cache_0_p0, _drop_handles) =
2621 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2622 let (room_event_cache_0_p1, _drop_handles) =
2623 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2624
2625 (room_event_cache_0_p0, room_event_cache_0_p1)
2626 };
2627
2628 {
2630 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2631 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2632 }
2633
2634 let (room_event_cache_1_p0, _) =
2638 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2639
2640 {
2642 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2643 assert!(guard.is_dirty().not());
2644 }
2645
2646 }
2649
2650 #[async_test]
2651 async fn test_uniq_read_marker() {
2652 let client = MockClientBuilder::new(None).build().await;
2653 let room_id = room_id!("!galette:saucisse.bzh");
2654 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2655
2656 let event_cache = client.event_cache();
2657
2658 event_cache.subscribe().unwrap();
2659
2660 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2661 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
2662 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
2663
2664 assert!(events.is_empty());
2665
2666 let read_marker_event = Raw::from_json_string(
2668 json!({
2669 "content": {
2670 "event_id": "$crepe:saucisse.bzh"
2671 },
2672 "room_id": "!galette:saucisse.bzh",
2673 "type": "m.fully_read"
2674 })
2675 .to_string(),
2676 )
2677 .unwrap();
2678 let account_data = vec![read_marker_event; 100];
2679
2680 room_event_cache
2681 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
2682 .await
2683 .unwrap();
2684
2685 assert_matches!(
2687 stream.recv().await.unwrap(),
2688 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
2689 );
2690
2691 assert!(stream.recv().now_or_never().is_none());
2692
2693 assert!(generic_stream.recv().now_or_never().is_none());
2695 }
2696
2697 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2698 room_event_cache
2699 .rfind_map_event_in_memory_by(|event| {
2700 (event.event_id().as_deref() == Some(event_id)).then_some(())
2701 })
2702 .await
2703 .unwrap()
2704 .is_some()
2705 }
2706}