1pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21 collections::BTreeMap,
22 fmt,
23 sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28 deserialized_responses::AmbiguityChange,
29 event_cache::Event,
30 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
31};
32use ruma::{
33 EventId, OwnedEventId, OwnedRoomId, RoomId,
34 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
35 serde::Raw,
36};
37pub(super) use state::{LockedRoomEventCacheState, RoomEventCacheStateLockWriteGuard};
38pub use subscriber::RoomEventCacheSubscriber;
39use tokio::sync::{Notify, broadcast::Receiver, mpsc};
40use tracing::{instrument, trace, warn};
41pub use updates::{
42 RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
43 RoomEventCacheUpdateSender,
44};
45
46use super::{
47 super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination},
48 TimelineVectorDiffs,
49 event_linked_chunk::sort_positions_descending,
50 thread::pagination::ThreadPagination,
51};
52use crate::{
53 client::WeakClient,
54 event_cache::{
55 EventFocusThreadMode,
56 caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus},
57 },
58 room::WeakRoom,
59};
60
61#[derive(Clone)]
65pub struct RoomEventCache {
66 inner: Arc<RoomEventCacheInner>,
67}
68
69impl fmt::Debug for RoomEventCache {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("RoomEventCache").finish_non_exhaustive()
72 }
73}
74
75impl RoomEventCache {
76 pub(super) fn new(
78 room_id: OwnedRoomId,
79 weak_room: WeakRoom,
80 state: LockedRoomEventCacheState,
81 shared_pagination_status: SharedObservable<SharedPaginationStatus>,
82 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
83 update_sender: RoomEventCacheUpdateSender,
84 ) -> Self {
85 Self {
86 inner: Arc::new(RoomEventCacheInner::new(
87 room_id,
88 weak_room,
89 state,
90 shared_pagination_status,
91 auto_shrink_sender,
92 update_sender,
93 )),
94 }
95 }
96
97 pub fn room_id(&self) -> &RoomId {
99 &self.inner.room_id
100 }
101
102 pub async fn events(&self) -> Result<Vec<Event>> {
107 let state = self.inner.state.read().await?;
108
109 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
110 }
111
112 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
119 let state = self.inner.state.read().await?;
120 let events =
121 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
122
123 let subscriber_count = state.subscriber_count();
124 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
125 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
126
127 let subscriber = RoomEventCacheSubscriber::new(
128 self.inner.update_sender.new_room_receiver(),
129 self.inner.room_id.clone(),
130 self.inner.auto_shrink_sender.clone(),
131 subscriber_count.clone(),
132 );
133
134 Ok((events, subscriber))
135 }
136
137 pub async fn subscribe_to_thread(
140 &self,
141 thread_root: OwnedEventId,
142 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
143 let mut state = self.inner.state.write().await?;
144
145 state.subscribe_to_thread(thread_root).await
146 }
147
148 pub async fn subscribe_to_pinned_events(
158 &self,
159 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
160 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
161 let state = self.inner.state.read().await?;
162
163 state.subscribe_to_pinned_events(room).await
164 }
165
166 #[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))]
180 pub async fn get_or_create_event_focused_cache(
181 &self,
182 event_id: OwnedEventId,
183 num_context_events: u16,
184 thread_mode: EventFocusThreadMode,
185 ) -> Result<EventFocusedCache> {
186 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
187 let guard = self.inner.state.read().await?;
188
189 if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
191 trace!("the cache was already created, returning it");
192 return Ok(cache);
193 }
194
195 let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone();
197
198 drop(guard);
202
203 let room_id = room.room_id().to_owned();
204 let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone());
205
206 trace!("creating a fresh event-focused cache");
207 let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender);
208
209 cache.start_from(room, num_context_events, thread_mode).await?;
211
212 let mut guard = self.inner.state.write().await?;
213
214 if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
217 trace!("another cache has been racily created, returning it");
218 return Ok(cache);
219 }
220
221 guard.insert_event_focused_cache(event_id, thread_mode, cache.clone());
223
224 Ok(cache)
225 }
226
227 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234 pub async fn get_event_focused_cache(
235 &self,
236 event_id: OwnedEventId,
237 thread_mode: EventFocusThreadMode,
238 ) -> Result<Option<EventFocusedCache>> {
239 Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode))
240 }
241
242 pub fn pagination(&self) -> RoomPagination {
245 RoomPagination::new(self.inner.clone())
246 }
247
248 pub async fn thread_pagination(&self, thread_id: OwnedEventId) -> Result<ThreadPagination> {
251 Ok(self.inner.state.write().await?.get_or_reload_thread(thread_id).pagination())
252 }
253
254 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
262 where
263 P: FnMut(&Event) -> Option<O>,
264 {
265 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
266 }
267
268 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
273 Ok(self
274 .inner
275 .state
276 .read()
277 .await?
278 .find_event(event_id)
279 .await
280 .ok()
281 .flatten()
282 .map(|(_loc, event)| event))
283 }
284
285 pub async fn find_event_with_relations(
297 &self,
298 event_id: &EventId,
299 filter: Option<Vec<RelationType>>,
300 ) -> Result<Option<(Event, Vec<Event>)>> {
301 Ok(self
303 .inner
304 .state
305 .read()
306 .await?
307 .find_event_with_relations(event_id, filter.clone())
308 .await
309 .ok()
310 .flatten())
311 }
312
313 pub async fn find_event_relations(
325 &self,
326 event_id: &EventId,
327 filter: Option<Vec<RelationType>>,
328 ) -> Result<Vec<Event>> {
329 self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
331 }
332
333 pub async fn clear(&self) -> Result<()> {
338 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
340
341 self.inner.update_sender.send(
343 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
344 diffs: updates_as_vector_diffs,
345 origin: EventsOrigin::Cache,
346 }),
347 Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
348 );
349
350 Ok(())
351 }
352
353 pub(in super::super) fn state(&self) -> &LockedRoomEventCacheState {
355 &self.inner.state
356 }
357
358 #[instrument(skip_all, fields(room_id = %self.room_id()))]
360 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
361 self.inner
362 .handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes)
363 .await?;
364 self.inner.handle_account_data(updates.account_data);
365
366 Ok(())
367 }
368
369 #[instrument(skip_all, fields(room_id = %self.room_id()))]
371 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
372 self.inner.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
373
374 Ok(())
375 }
376
377 pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
379 &self.inner.update_sender
380 }
381
382 pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
384 self.inner
385 .handle_timeline(
386 Timeline { limited: false, prev_batch: None, events: vec![event] },
387 Vec::new(),
388 BTreeMap::new(),
389 )
390 .await
391 }
392
393 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
396 match self.inner.state.write().await {
397 Ok(mut state_guard) => {
398 if let Err(err) = state_guard.save_events(events).await {
399 warn!("couldn't save event in the event cache: {err}");
400 }
401 }
402
403 Err(err) => {
404 warn!("couldn't save event in the event cache: {err}");
405 }
406 }
407 }
408
409 pub async fn debug_string(&self) -> Vec<String> {
412 match self.inner.state.read().await {
413 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
414 Err(err) => {
415 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
416
417 vec![]
418 }
419 }
420 }
421}
422
423pub(super) struct RoomEventCacheInner {
425 room_id: OwnedRoomId,
427
428 pub weak_room: WeakRoom,
429
430 pub state: LockedRoomEventCacheState,
432
433 pub pagination_batch_token_notifier: Notify,
435
436 pub shared_pagination_status: SharedObservable<SharedPaginationStatus>,
437
438 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
443
444 update_sender: RoomEventCacheUpdateSender,
446}
447
448impl RoomEventCacheInner {
449 fn new(
452 room_id: OwnedRoomId,
453 weak_room: WeakRoom,
454 state: LockedRoomEventCacheState,
455 shared_pagination_status: SharedObservable<SharedPaginationStatus>,
456 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
457 update_sender: RoomEventCacheUpdateSender,
458 ) -> Self {
459 Self {
460 room_id,
461 weak_room,
462 state,
463 update_sender,
464 pagination_batch_token_notifier: Default::default(),
465 auto_shrink_sender,
466 shared_pagination_status,
467 }
468 }
469
470 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
471 if account_data.is_empty() {
472 return;
473 }
474
475 let mut handled_read_marker = false;
476
477 trace!("Handling account data");
478
479 for raw_event in account_data {
480 match raw_event.deserialize() {
481 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
482 if handled_read_marker {
485 continue;
486 }
487
488 handled_read_marker = true;
489
490 self.update_sender.send(
492 RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
493 None,
494 );
495 }
496
497 Ok(_) => {
498 }
501
502 Err(e) => {
503 let event_type = raw_event.get_field::<String>("type").ok().flatten();
504 warn!(event_type, "Failed to deserialize account data: {e}");
505 }
506 }
507 }
508 }
509
510 async fn handle_timeline(
513 &self,
514 timeline: Timeline,
515 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
516 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
517 ) -> Result<()> {
518 if timeline.events.is_empty()
519 && timeline.prev_batch.is_none()
520 && ephemeral_events.is_empty()
521 && ambiguity_changes.is_empty()
522 {
523 return Ok(());
524 }
525
526 trace!("adding new events");
528
529 let (stored_prev_batch_token, timeline_event_diffs) =
530 self.state.write().await?.handle_sync(timeline, &ephemeral_events).await?;
531
532 if stored_prev_batch_token {
535 self.pagination_batch_token_notifier.notify_one();
536 }
537
538 if !timeline_event_diffs.is_empty() {
541 self.update_sender.send(
542 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
543 diffs: timeline_event_diffs,
544 origin: EventsOrigin::Sync,
545 }),
546 Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
547 );
548 }
549
550 if !ephemeral_events.is_empty() {
551 self.update_sender
552 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
553 }
554
555 if !ambiguity_changes.is_empty() {
556 self.update_sender
557 .send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }, None);
558 }
559
560 Ok(())
561 }
562}
563
564#[derive(Clone, Copy)]
565pub(in super::super) enum PostProcessingOrigin {
566 Sync,
567 Backpagination,
568 #[cfg(feature = "e2e-encryption")]
569 Redecryption,
570}
571
572#[cfg(test)]
573mod tests {
574 use matrix_sdk_base::{RoomState, event_cache::Event};
575 use matrix_sdk_test::{async_test, event_factory::EventFactory};
576 use ruma::{
577 RoomId, event_id,
578 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
579 room_id, user_id,
580 };
581
582 use crate::test_utils::logged_in_client;
583
584 #[async_test]
585 async fn test_find_event_by_id_with_edit_relation() {
586 let original_id = event_id!("$original");
587 let related_id = event_id!("$related");
588 let room_id = room_id!("!galette:saucisse.bzh");
589 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
590
591 assert_relations(
592 room_id,
593 f.text_msg("Original event").event_id(original_id).into(),
594 f.text_msg("* An edited event")
595 .edit(
596 original_id,
597 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
598 )
599 .event_id(related_id)
600 .into(),
601 f,
602 )
603 .await;
604 }
605
606 #[async_test]
607 async fn test_find_event_by_id_with_thread_reply_relation() {
608 let original_id = event_id!("$original");
609 let related_id = event_id!("$related");
610 let room_id = room_id!("!galette:saucisse.bzh");
611 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
612
613 assert_relations(
614 room_id,
615 f.text_msg("Original event").event_id(original_id).into(),
616 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
617 f,
618 )
619 .await;
620 }
621
622 #[async_test]
623 async fn test_find_event_by_id_with_reaction_relation() {
624 let original_id = event_id!("$original");
625 let related_id = event_id!("$related");
626 let room_id = room_id!("!galette:saucisse.bzh");
627 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
628
629 assert_relations(
630 room_id,
631 f.text_msg("Original event").event_id(original_id).into(),
632 f.reaction(original_id, ":D").event_id(related_id).into(),
633 f,
634 )
635 .await;
636 }
637
638 #[async_test]
639 async fn test_find_event_by_id_with_poll_response_relation() {
640 let original_id = event_id!("$original");
641 let related_id = event_id!("$related");
642 let room_id = room_id!("!galette:saucisse.bzh");
643 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
644
645 assert_relations(
646 room_id,
647 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
648 .event_id(original_id)
649 .into(),
650 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
651 f,
652 )
653 .await;
654 }
655
656 #[async_test]
657 async fn test_find_event_by_id_with_poll_end_relation() {
658 let original_id = event_id!("$original");
659 let related_id = event_id!("$related");
660 let room_id = room_id!("!galette:saucisse.bzh");
661 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
662
663 assert_relations(
664 room_id,
665 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
666 .event_id(original_id)
667 .into(),
668 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
669 f,
670 )
671 .await;
672 }
673
674 #[async_test]
675 async fn test_find_event_by_id_with_filtered_relationships() {
676 let original_id = event_id!("$original");
677 let related_id = event_id!("$related");
678 let associated_related_id = event_id!("$recursive_related");
679 let room_id = room_id!("!galette:saucisse.bzh");
680 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
681
682 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
683 let related_event = event_factory
684 .text_msg("* Edited event")
685 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
686 .event_id(related_id)
687 .into();
688 let associated_related_event =
689 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
690
691 let client = logged_in_client(None).await;
692
693 let event_cache = client.event_cache();
694 event_cache.subscribe().unwrap();
695
696 client.base_client().get_or_create_room(room_id, RoomState::Joined);
697 let room = client.get_room(room_id).unwrap();
698
699 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
700
701 room_event_cache.save_events([original_event]).await;
703
704 room_event_cache.save_events([related_event]).await;
706
707 room_event_cache.save_events([associated_related_event]).await;
709
710 let filter = Some(vec![RelationType::Replacement]);
711 let (event, related_events) = room_event_cache
712 .find_event_with_relations(original_id, filter)
713 .await
714 .expect("Failed to find the event with relations")
715 .expect("Event has no relation");
716 let cached_event_id = event.event_id().unwrap();
718 assert_eq!(cached_event_id, original_id);
719
720 assert_eq!(related_events.len(), 1);
722
723 let related_event_id = related_events[0].event_id().unwrap();
724 assert_eq!(related_event_id, related_id);
725
726 let filter = Some(vec![RelationType::Thread]);
728 let (event, related_events) = room_event_cache
729 .find_event_with_relations(original_id, filter)
730 .await
731 .expect("Failed to find the event with relations")
732 .expect("Event has no relation");
733
734 let cached_event_id = event.event_id().unwrap();
736 assert_eq!(cached_event_id, original_id);
737 assert!(related_events.is_empty());
739 }
740
741 #[async_test]
742 async fn test_find_event_by_id_with_recursive_relation() {
743 let original_id = event_id!("$original");
744 let related_id = event_id!("$related");
745 let associated_related_id = event_id!("$recursive_related");
746 let room_id = room_id!("!galette:saucisse.bzh");
747 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
748
749 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
750 let related_event = event_factory
751 .text_msg("* Edited event")
752 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
753 .event_id(related_id)
754 .into();
755 let associated_related_event =
756 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
757
758 let client = logged_in_client(None).await;
759
760 let event_cache = client.event_cache();
761 event_cache.subscribe().unwrap();
762
763 client.base_client().get_or_create_room(room_id, RoomState::Joined);
764 let room = client.get_room(room_id).unwrap();
765
766 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
767
768 room_event_cache.save_events([original_event]).await;
770
771 room_event_cache.save_events([related_event]).await;
773
774 room_event_cache.save_events([associated_related_event]).await;
776
777 let (event, related_events) = room_event_cache
778 .find_event_with_relations(original_id, None)
779 .await
780 .expect("Failed to find the event with relations")
781 .expect("Event has no relation");
782 let cached_event_id = event.event_id().unwrap();
784 assert_eq!(cached_event_id, original_id);
785
786 assert_eq!(related_events.len(), 2);
788
789 let related_event_id = related_events[0].event_id().unwrap();
790 assert_eq!(related_event_id, related_id);
791 let related_event_id = related_events[1].event_id().unwrap();
792 assert_eq!(related_event_id, associated_related_id);
793 }
794
795 async fn assert_relations(
796 room_id: &RoomId,
797 original_event: Event,
798 related_event: Event,
799 event_factory: EventFactory,
800 ) {
801 let client = logged_in_client(None).await;
802
803 let event_cache = client.event_cache();
804 event_cache.subscribe().unwrap();
805
806 client.base_client().get_or_create_room(room_id, RoomState::Joined);
807 let room = client.get_room(room_id).unwrap();
808
809 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
810
811 let original_event_id = original_event.event_id().unwrap();
813 room_event_cache.save_events([original_event]).await;
814
815 let unrelated_id = event_id!("$2");
817 room_event_cache
818 .save_events([event_factory
819 .text_msg("An unrelated event")
820 .event_id(unrelated_id)
821 .into()])
822 .await;
823
824 let related_id = related_event.event_id().unwrap();
826 room_event_cache.save_events([related_event]).await;
827
828 let (event, related_events) = room_event_cache
829 .find_event_with_relations(&original_event_id, None)
830 .await
831 .expect("Failed to find the event with relations")
832 .expect("Event has no relation");
833 let cached_event_id = event.event_id().unwrap();
835 assert_eq!(cached_event_id, original_event_id);
836
837 let related_event_id = related_events[0].event_id().unwrap();
839 assert_eq!(related_event_id, related_id);
840 }
841}
842
843#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
845 use std::{ops::Not, sync::Arc};
846
847 use assert_matches::assert_matches;
848 use assert_matches2::assert_let;
849 use eyeball_im::VectorDiff;
850 use futures_util::FutureExt;
851 use matrix_sdk_base::{
852 RoomState,
853 event_cache::{
854 Gap,
855 store::{EventCacheStore as _, MemoryStore},
856 },
857 linked_chunk::{
858 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
859 lazy_loader::from_all_chunks,
860 },
861 store::StoreConfig,
862 sync::{JoinedRoomUpdate, Timeline},
863 };
864 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
865 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
866 use ruma::{
867 EventId, OwnedUserId, event_id,
868 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
869 room_id,
870 serde::Raw,
871 user_id,
872 };
873 use serde_json::json;
874 use tokio::task::yield_now;
875
876 use super::{
877 super::{
878 super::TimelineVectorDiffs, lock::Reload as _,
879 pagination::LoadMoreEventsBackwardsOutcome,
880 },
881 RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
882 };
883 use crate::{assert_let_timeout, test_utils::client::MockClientBuilder};
884
885 #[async_test]
886 async fn test_write_to_storage() {
887 let room_id = room_id!("!galette:saucisse.bzh");
888 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
889
890 let event_cache_store = Arc::new(MemoryStore::new());
891
892 let client = MockClientBuilder::new(None)
893 .on_builder(|builder| {
894 builder.store_config(
895 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
896 .event_cache_store(event_cache_store.clone()),
897 )
898 })
899 .build()
900 .await;
901
902 let event_cache = client.event_cache();
903
904 event_cache.subscribe().unwrap();
906
907 client.base_client().get_or_create_room(room_id, RoomState::Joined);
908 let room = client.get_room(room_id).unwrap();
909
910 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
911 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
912
913 let timeline = Timeline {
915 limited: true,
916 prev_batch: Some("raclette".to_owned()),
917 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
918 };
919
920 room_event_cache
921 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
922 .await
923 .unwrap();
924
925 assert_matches!(
927 generic_stream.recv().await,
928 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
929 assert_eq!(expected_room_id, room_id);
930 }
931 );
932 assert!(generic_stream.is_empty());
933
934 let linked_chunk = from_all_chunks::<3, _, _>(
936 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
937 )
938 .unwrap()
939 .unwrap();
940
941 assert_eq!(linked_chunk.chunks().count(), 2);
942
943 let mut chunks = linked_chunk.chunks();
944
945 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
947 assert_eq!(gap.token, "raclette");
948 });
949
950 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
952 assert_eq!(events.len(), 1);
953 let deserialized = events[0].raw().deserialize().unwrap();
954 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
955 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
956 });
957
958 assert!(chunks.next().is_none());
960 }
961
962 #[async_test]
963 async fn test_write_to_storage_strips_bundled_relations() {
964 let room_id = room_id!("!galette:saucisse.bzh");
965 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
966
967 let event_cache_store = Arc::new(MemoryStore::new());
968
969 let client = MockClientBuilder::new(None)
970 .on_builder(|builder| {
971 builder.store_config(
972 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
973 .event_cache_store(event_cache_store.clone()),
974 )
975 })
976 .build()
977 .await;
978
979 let event_cache = client.event_cache();
980
981 event_cache.subscribe().unwrap();
983
984 client.base_client().get_or_create_room(room_id, RoomState::Joined);
985 let room = client.get_room(room_id).unwrap();
986
987 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
988 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
989
990 let ev = f
992 .text_msg("hey yo")
993 .sender(*ALICE)
994 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
995 .into_event();
996
997 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
998
999 room_event_cache
1000 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1001 .await
1002 .unwrap();
1003
1004 assert_matches!(
1006 generic_stream.recv().await,
1007 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1008 assert_eq!(expected_room_id, room_id);
1009 }
1010 );
1011 assert!(generic_stream.is_empty());
1012
1013 {
1015 let events = room_event_cache.events().await.unwrap();
1016
1017 assert_eq!(events.len(), 1);
1018
1019 let ev = events[0].raw().deserialize().unwrap();
1020 assert_let!(
1021 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1022 );
1023
1024 let original = msg.as_original().unwrap();
1025 assert_eq!(original.content.body(), "hey yo");
1026 assert!(original.unsigned.relations.replace.is_some());
1027 }
1028
1029 let linked_chunk = from_all_chunks::<3, _, _>(
1031 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1032 )
1033 .unwrap()
1034 .unwrap();
1035
1036 assert_eq!(linked_chunk.chunks().count(), 1);
1037
1038 let mut chunks = linked_chunk.chunks();
1039 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1040 assert_eq!(events.len(), 1);
1041
1042 let ev = events[0].raw().deserialize().unwrap();
1043 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1044
1045 let original = msg.as_original().unwrap();
1046 assert_eq!(original.content.body(), "hey yo");
1047 assert!(original.unsigned.relations.replace.is_none());
1048 });
1049
1050 assert!(chunks.next().is_none());
1052 }
1053
1054 #[async_test]
1055 async fn test_clear() {
1056 let room_id = room_id!("!galette:saucisse.bzh");
1057 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1058
1059 let event_cache_store = Arc::new(MemoryStore::new());
1060
1061 let event_id1 = event_id!("$1");
1062 let event_id2 = event_id!("$2");
1063
1064 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1065 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1066
1067 event_cache_store
1069 .handle_linked_chunk_updates(
1070 LinkedChunkId::Room(room_id),
1071 vec![
1072 Update::NewItemsChunk {
1074 previous: None,
1075 new: ChunkIdentifier::new(0),
1076 next: None,
1077 },
1078 Update::NewGapChunk {
1080 previous: Some(ChunkIdentifier::new(0)),
1081 new: ChunkIdentifier::new(42),
1083 next: None,
1084 gap: Gap { token: "comté".to_owned() },
1085 },
1086 Update::NewItemsChunk {
1088 previous: Some(ChunkIdentifier::new(42)),
1089 new: ChunkIdentifier::new(1),
1090 next: None,
1091 },
1092 Update::PushItems {
1093 at: Position::new(ChunkIdentifier::new(1), 0),
1094 items: vec![ev1.clone()],
1095 },
1096 Update::NewItemsChunk {
1098 previous: Some(ChunkIdentifier::new(1)),
1099 new: ChunkIdentifier::new(2),
1100 next: None,
1101 },
1102 Update::PushItems {
1103 at: Position::new(ChunkIdentifier::new(2), 0),
1104 items: vec![ev2.clone()],
1105 },
1106 ],
1107 )
1108 .await
1109 .unwrap();
1110
1111 let client = MockClientBuilder::new(None)
1112 .on_builder(|builder| {
1113 builder.store_config(
1114 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1115 .event_cache_store(event_cache_store.clone()),
1116 )
1117 })
1118 .build()
1119 .await;
1120
1121 let event_cache = client.event_cache();
1122
1123 event_cache.subscribe().unwrap();
1125
1126 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1127 let room = client.get_room(room_id).unwrap();
1128
1129 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1130
1131 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1132 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1133
1134 {
1136 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1137 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1138 }
1139
1140 {
1142 assert_eq!(items.len(), 1);
1144 assert_eq!(items[0].event_id().unwrap(), event_id2);
1145
1146 assert!(stream.is_empty());
1147 }
1148
1149 {
1151 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1152
1153 assert_let_timeout!(
1154 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1155 stream.recv()
1156 );
1157 assert_eq!(diffs.len(), 1);
1158 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1159 assert_eq!(event.event_id().unwrap(), event_id1);
1161 });
1162
1163 assert!(stream.is_empty());
1164
1165 assert_let_timeout!(
1166 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1167 generic_stream.recv()
1168 );
1169 assert_eq!(room_id, expected_room_id);
1170 assert!(generic_stream.is_empty());
1171 }
1172
1173 room_event_cache.clear().await.unwrap();
1175
1176 assert_let_timeout!(
1178 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1179 stream.recv()
1180 );
1181 assert_eq!(diffs.len(), 1);
1182 assert_let!(VectorDiff::Clear = &diffs[0]);
1183
1184 assert_let_timeout!(
1186 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1187 );
1188 assert_eq!(received_room_id, room_id);
1189 assert!(generic_stream.is_empty());
1190
1191 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1194
1195 let items = room_event_cache.events().await.unwrap();
1197 assert!(items.is_empty());
1198
1199 let linked_chunk = from_all_chunks::<3, _, _>(
1201 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1202 )
1203 .unwrap()
1204 .unwrap();
1205
1206 assert_eq!(linked_chunk.num_items(), 0);
1210 }
1211
1212 #[async_test]
1213 async fn test_load_from_storage() {
1214 let room_id = room_id!("!galette:saucisse.bzh");
1215 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1216
1217 let event_cache_store = Arc::new(MemoryStore::new());
1218
1219 let event_id1 = event_id!("$1");
1220 let event_id2 = event_id!("$2");
1221
1222 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1223 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1224
1225 event_cache_store
1227 .handle_linked_chunk_updates(
1228 LinkedChunkId::Room(room_id),
1229 vec![
1230 Update::NewItemsChunk {
1232 previous: None,
1233 new: ChunkIdentifier::new(0),
1234 next: None,
1235 },
1236 Update::NewGapChunk {
1238 previous: Some(ChunkIdentifier::new(0)),
1239 new: ChunkIdentifier::new(42),
1241 next: None,
1242 gap: Gap { token: "cheddar".to_owned() },
1243 },
1244 Update::NewItemsChunk {
1246 previous: Some(ChunkIdentifier::new(42)),
1247 new: ChunkIdentifier::new(1),
1248 next: None,
1249 },
1250 Update::PushItems {
1251 at: Position::new(ChunkIdentifier::new(1), 0),
1252 items: vec![ev1.clone()],
1253 },
1254 Update::NewItemsChunk {
1256 previous: Some(ChunkIdentifier::new(1)),
1257 new: ChunkIdentifier::new(2),
1258 next: None,
1259 },
1260 Update::PushItems {
1261 at: Position::new(ChunkIdentifier::new(2), 0),
1262 items: vec![ev2.clone()],
1263 },
1264 ],
1265 )
1266 .await
1267 .unwrap();
1268
1269 let client = MockClientBuilder::new(None)
1270 .on_builder(|builder| {
1271 builder.store_config(
1272 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1273 .event_cache_store(event_cache_store.clone()),
1274 )
1275 })
1276 .build()
1277 .await;
1278
1279 let event_cache = client.event_cache();
1280
1281 event_cache.subscribe().unwrap();
1283
1284 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1286
1287 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1288 let room = client.get_room(room_id).unwrap();
1289
1290 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1291
1292 assert_matches!(
1295 generic_stream.recv().await,
1296 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1297 assert_eq!(room_id, expected_room_id);
1298 }
1299 );
1300 assert!(generic_stream.is_empty());
1301
1302 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1303
1304 assert_eq!(items.len(), 1);
1307 assert_eq!(items[0].event_id().unwrap(), event_id2);
1308 assert!(stream.is_empty());
1309
1310 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1312 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1313
1314 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1316
1317 assert_let_timeout!(
1318 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1319 stream.recv()
1320 );
1321 assert_eq!(diffs.len(), 1);
1322 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1323 assert_eq!(event.event_id().unwrap(), event_id1);
1324 });
1325
1326 assert!(stream.is_empty());
1327
1328 assert_matches!(
1330 generic_stream.recv().await,
1331 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1332 assert_eq!(expected_room_id, room_id);
1333 }
1334 );
1335 assert!(generic_stream.is_empty());
1336
1337 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1339
1340 room_event_cache
1341 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1342 .await
1343 .unwrap();
1344
1345 assert!(generic_stream.recv().now_or_never().is_none());
1348
1349 let items = room_event_cache.events().await.unwrap();
1354 assert_eq!(items.len(), 2);
1355 assert_eq!(items[0].event_id().unwrap(), event_id1);
1356 assert_eq!(items[1].event_id().unwrap(), event_id2);
1357 }
1358
1359 #[async_test]
1360 async fn test_load_from_storage_resilient_to_failure() {
1361 let room_id = room_id!("!fondue:patate.ch");
1362 let event_cache_store = Arc::new(MemoryStore::new());
1363
1364 let event = EventFactory::new()
1365 .room(room_id)
1366 .sender(user_id!("@ben:saucisse.bzh"))
1367 .text_msg("foo")
1368 .event_id(event_id!("$42"))
1369 .into_event();
1370
1371 event_cache_store
1373 .handle_linked_chunk_updates(
1374 LinkedChunkId::Room(room_id),
1375 vec![
1376 Update::NewItemsChunk {
1377 previous: None,
1378 new: ChunkIdentifier::new(0),
1379 next: None,
1380 },
1381 Update::PushItems {
1382 at: Position::new(ChunkIdentifier::new(0), 0),
1383 items: vec![event],
1384 },
1385 Update::NewItemsChunk {
1386 previous: Some(ChunkIdentifier::new(0)),
1387 new: ChunkIdentifier::new(1),
1388 next: Some(ChunkIdentifier::new(0)),
1389 },
1390 ],
1391 )
1392 .await
1393 .unwrap();
1394
1395 let client = MockClientBuilder::new(None)
1396 .on_builder(|builder| {
1397 builder.store_config(
1398 StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1399 .event_cache_store(event_cache_store.clone()),
1400 )
1401 })
1402 .build()
1403 .await;
1404
1405 let event_cache = client.event_cache();
1406
1407 event_cache.subscribe().unwrap();
1409
1410 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1411 let room = client.get_room(room_id).unwrap();
1412
1413 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1414
1415 let items = room_event_cache.events().await.unwrap();
1416
1417 assert!(items.is_empty());
1420
1421 let raw_chunks =
1424 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1425 assert!(raw_chunks.is_empty());
1426 }
1427
1428 #[async_test]
1429 async fn test_no_useless_gaps() {
1430 let room_id = room_id!("!galette:saucisse.bzh");
1431
1432 let client = MockClientBuilder::new(None).build().await;
1433
1434 let event_cache = client.event_cache();
1435 event_cache.subscribe().unwrap();
1436
1437 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1438 let room = client.get_room(room_id).unwrap();
1439 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1440 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1441
1442 let f = EventFactory::new().room(room_id).sender(*ALICE);
1443
1444 room_event_cache
1447 .handle_joined_room_update(JoinedRoomUpdate {
1448 timeline: Timeline {
1449 limited: true,
1450 prev_batch: Some("raclette".to_owned()),
1451 events: vec![f.text_msg("hey yo").into_event()],
1452 },
1453 ..Default::default()
1454 })
1455 .await
1456 .unwrap();
1457
1458 assert_matches!(
1460 generic_stream.recv().await,
1461 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1462 assert_eq!(expected_room_id, room_id);
1463 }
1464 );
1465 assert!(generic_stream.is_empty());
1466
1467 {
1468 let state = room_event_cache.inner.state.read().await.unwrap();
1469
1470 let mut num_gaps = 0;
1471 let mut num_events = 0;
1472
1473 for c in state.room_linked_chunk().chunks() {
1474 match c.content() {
1475 ChunkContent::Items(items) => num_events += items.len(),
1476 ChunkContent::Gap(_) => num_gaps += 1,
1477 }
1478 }
1479
1480 assert_eq!(num_gaps, 0);
1483 assert_eq!(num_events, 1);
1484 }
1485
1486 assert_matches!(
1488 room_event_cache.pagination().load_more_events_backwards().await.unwrap(),
1489 LoadMoreEventsBackwardsOutcome::Gap { .. }
1490 );
1491
1492 {
1493 let state = room_event_cache.inner.state.read().await.unwrap();
1494
1495 let mut num_gaps = 0;
1496 let mut num_events = 0;
1497
1498 for c in state.room_linked_chunk().chunks() {
1499 match c.content() {
1500 ChunkContent::Items(items) => num_events += items.len(),
1501 ChunkContent::Gap(_) => num_gaps += 1,
1502 }
1503 }
1504
1505 assert_eq!(num_gaps, 1);
1507 assert_eq!(num_events, 1);
1508 }
1509
1510 room_event_cache
1513 .handle_joined_room_update(JoinedRoomUpdate {
1514 timeline: Timeline {
1515 limited: false,
1516 prev_batch: Some("fondue".to_owned()),
1517 events: vec![f.text_msg("sup").into_event()],
1518 },
1519 ..Default::default()
1520 })
1521 .await
1522 .unwrap();
1523
1524 assert_matches!(
1526 generic_stream.recv().await,
1527 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1528 assert_eq!(expected_room_id, room_id);
1529 }
1530 );
1531 assert!(generic_stream.is_empty());
1532
1533 {
1534 let state = room_event_cache.inner.state.read().await.unwrap();
1535
1536 let mut num_gaps = 0;
1537 let mut num_events = 0;
1538
1539 for c in state.room_linked_chunk().chunks() {
1540 match c.content() {
1541 ChunkContent::Items(items) => num_events += items.len(),
1542 ChunkContent::Gap(gap) => {
1543 assert_eq!(gap.token, "raclette");
1544 num_gaps += 1;
1545 }
1546 }
1547 }
1548
1549 assert_eq!(num_gaps, 1);
1551 assert_eq!(num_events, 2);
1552 }
1553 }
1554
1555 #[async_test]
1556 async fn test_shrink_to_last_chunk() {
1557 let room_id = room_id!("!galette:saucisse.bzh");
1558
1559 let client = MockClientBuilder::new(None).build().await;
1560
1561 let f = EventFactory::new().room(room_id);
1562
1563 let evid1 = event_id!("$1");
1564 let evid2 = event_id!("$2");
1565
1566 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1567 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1568
1569 {
1571 client
1572 .event_cache_store()
1573 .lock()
1574 .await
1575 .expect("Could not acquire the event cache lock")
1576 .as_clean()
1577 .expect("Could not acquire a clean event cache lock")
1578 .handle_linked_chunk_updates(
1579 LinkedChunkId::Room(room_id),
1580 vec![
1581 Update::NewItemsChunk {
1582 previous: None,
1583 new: ChunkIdentifier::new(0),
1584 next: None,
1585 },
1586 Update::PushItems {
1587 at: Position::new(ChunkIdentifier::new(0), 0),
1588 items: vec![ev1],
1589 },
1590 Update::NewItemsChunk {
1591 previous: Some(ChunkIdentifier::new(0)),
1592 new: ChunkIdentifier::new(1),
1593 next: None,
1594 },
1595 Update::PushItems {
1596 at: Position::new(ChunkIdentifier::new(1), 0),
1597 items: vec![ev2],
1598 },
1599 ],
1600 )
1601 .await
1602 .unwrap();
1603 }
1604
1605 let event_cache = client.event_cache();
1606 event_cache.subscribe().unwrap();
1607
1608 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1609 let room = client.get_room(room_id).unwrap();
1610 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1611
1612 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1614 assert_eq!(events.len(), 1);
1615 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1616 assert!(stream.is_empty());
1617
1618 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1619
1620 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1622 assert_eq!(outcome.events.len(), 1);
1623 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1624 assert!(outcome.reached_start);
1625
1626 assert_let_timeout!(
1628 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1629 stream.recv()
1630 );
1631 assert_eq!(diffs.len(), 1);
1632 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1633 assert_eq!(value.event_id().as_deref(), Some(evid1));
1634 });
1635
1636 assert!(stream.is_empty());
1637
1638 assert_let_timeout!(
1640 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1641 );
1642 assert_eq!(expected_room_id, room_id);
1643 assert!(generic_stream.is_empty());
1644
1645 room_event_cache
1647 .inner
1648 .state
1649 .write()
1650 .await
1651 .unwrap()
1652 .reload()
1653 .await
1654 .expect("shrinking should succeed");
1655
1656 assert_let_timeout!(
1658 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1659 stream.recv()
1660 );
1661 assert_eq!(diffs.len(), 2);
1662 assert_matches!(&diffs[0], VectorDiff::Clear);
1663 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1664 assert_eq!(values.len(), 1);
1665 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1666 });
1667
1668 assert!(stream.is_empty());
1669
1670 assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1672 assert!(generic_stream.is_empty());
1673
1674 let events = room_event_cache.events().await.unwrap();
1676 assert_eq!(events.len(), 1);
1677 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1678
1679 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1682 assert_eq!(outcome.events.len(), 1);
1683 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1684 assert!(outcome.reached_start);
1685 }
1686
1687 #[async_test]
1688 async fn test_room_ordering() {
1689 let room_id = room_id!("!galette:saucisse.bzh");
1690
1691 let client = MockClientBuilder::new(None).build().await;
1692
1693 let f = EventFactory::new().room(room_id).sender(*ALICE);
1694
1695 let evid1 = event_id!("$1");
1696 let evid2 = event_id!("$2");
1697 let evid3 = event_id!("$3");
1698
1699 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1700 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1701 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1702
1703 {
1705 client
1706 .event_cache_store()
1707 .lock()
1708 .await
1709 .expect("Could not acquire the event cache lock")
1710 .as_clean()
1711 .expect("Could not acquire a clean event cache lock")
1712 .handle_linked_chunk_updates(
1713 LinkedChunkId::Room(room_id),
1714 vec![
1715 Update::NewItemsChunk {
1716 previous: None,
1717 new: ChunkIdentifier::new(0),
1718 next: None,
1719 },
1720 Update::PushItems {
1721 at: Position::new(ChunkIdentifier::new(0), 0),
1722 items: vec![ev1, ev2],
1723 },
1724 Update::NewItemsChunk {
1725 previous: Some(ChunkIdentifier::new(0)),
1726 new: ChunkIdentifier::new(1),
1727 next: None,
1728 },
1729 Update::PushItems {
1730 at: Position::new(ChunkIdentifier::new(1), 0),
1731 items: vec![ev3.clone()],
1732 },
1733 ],
1734 )
1735 .await
1736 .unwrap();
1737 }
1738
1739 let event_cache = client.event_cache();
1740 event_cache.subscribe().unwrap();
1741
1742 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1743 let room = client.get_room(room_id).unwrap();
1744 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1745
1746 {
1749 let state = room_event_cache.inner.state.read().await.unwrap();
1750 let room_linked_chunk = state.room_linked_chunk();
1751
1752 assert_eq!(
1754 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1755 Some(0)
1756 );
1757
1758 assert_eq!(
1760 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1761 Some(1)
1762 );
1763
1764 let mut events = room_linked_chunk.events();
1766 let (pos, ev) = events.next().unwrap();
1767 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1768 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1769 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1770
1771 assert!(events.next().is_none());
1773 }
1774
1775 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1777 assert!(outcome.reached_start);
1778
1779 {
1782 let state = room_event_cache.inner.state.read().await.unwrap();
1783 let room_linked_chunk = state.room_linked_chunk();
1784
1785 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1786 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1787 }
1788 }
1789
1790 let evid4 = event_id!("$4");
1795 room_event_cache
1796 .handle_joined_room_update(JoinedRoomUpdate {
1797 timeline: Timeline {
1798 limited: true,
1799 prev_batch: Some("fondue".to_owned()),
1800 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1801 },
1802 ..Default::default()
1803 })
1804 .await
1805 .unwrap();
1806
1807 {
1808 let state = room_event_cache.inner.state.read().await.unwrap();
1809 let room_linked_chunk = state.room_linked_chunk();
1810
1811 let mut events = room_linked_chunk.events();
1813
1814 let (pos, ev) = events.next().unwrap();
1815 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1816 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1817
1818 let (pos, ev) = events.next().unwrap();
1819 assert_eq!(ev.event_id().as_deref(), Some(evid4));
1820 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1821
1822 assert!(events.next().is_none());
1824
1825 assert_eq!(
1827 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1828 Some(0)
1829 );
1830 assert_eq!(
1831 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1832 Some(1)
1833 );
1834
1835 assert_eq!(
1838 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1839 None
1840 );
1841 }
1842 }
1843
1844 #[async_test]
1845 async fn test_auto_shrink_after_all_subscribers_are_gone() {
1846 let room_id = room_id!("!galette:saucisse.bzh");
1847
1848 let client = MockClientBuilder::new(None).build().await;
1849
1850 let f = EventFactory::new().room(room_id);
1851
1852 let evid1 = event_id!("$1");
1853 let evid2 = event_id!("$2");
1854
1855 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1856 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1857
1858 {
1860 client
1861 .event_cache_store()
1862 .lock()
1863 .await
1864 .expect("Could not acquire the event cache lock")
1865 .as_clean()
1866 .expect("Could not acquire a clean event cache lock")
1867 .handle_linked_chunk_updates(
1868 LinkedChunkId::Room(room_id),
1869 vec![
1870 Update::NewItemsChunk {
1871 previous: None,
1872 new: ChunkIdentifier::new(0),
1873 next: None,
1874 },
1875 Update::PushItems {
1876 at: Position::new(ChunkIdentifier::new(0), 0),
1877 items: vec![ev1],
1878 },
1879 Update::NewItemsChunk {
1880 previous: Some(ChunkIdentifier::new(0)),
1881 new: ChunkIdentifier::new(1),
1882 next: None,
1883 },
1884 Update::PushItems {
1885 at: Position::new(ChunkIdentifier::new(1), 0),
1886 items: vec![ev2],
1887 },
1888 ],
1889 )
1890 .await
1891 .unwrap();
1892 }
1893
1894 let event_cache = client.event_cache();
1895 event_cache.subscribe().unwrap();
1896
1897 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1898 let room = client.get_room(room_id).unwrap();
1899 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1900
1901 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1903 assert_eq!(events1.len(), 1);
1904 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1905 assert!(stream1.is_empty());
1906
1907 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1908
1909 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1911 assert_eq!(outcome.events.len(), 1);
1912 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1913 assert!(outcome.reached_start);
1914
1915 assert_let_timeout!(
1918 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1919 stream1.recv()
1920 );
1921 assert_eq!(diffs.len(), 1);
1922 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1923 assert_eq!(value.event_id().as_deref(), Some(evid1));
1924 });
1925
1926 assert!(stream1.is_empty());
1927
1928 assert_let_timeout!(
1929 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1930 );
1931 assert_eq!(expected_room_id, room_id);
1932 assert!(generic_stream.is_empty());
1933
1934 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1938 assert_eq!(events2.len(), 2);
1939 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1940 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1941 assert!(stream2.is_empty());
1942
1943 drop(stream1);
1945 yield_now().await;
1946
1947 assert!(stream2.is_empty());
1949
1950 drop(stream2);
1952 yield_now().await;
1953
1954 {
1957 let state = room_event_cache.inner.state.read().await.unwrap();
1959 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
1960 }
1961
1962 let events3 = room_event_cache.events().await.unwrap();
1964 assert_eq!(events3.len(), 1);
1965 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
1966 }
1967
1968 #[async_test]
1969 async fn test_rfind_map_event_in_memory_by() {
1970 let user_id = user_id!("@mnt_io:matrix.org");
1971 let room_id = room_id!("!raclette:patate.ch");
1972 let client = MockClientBuilder::new(None).build().await;
1973
1974 let event_factory = EventFactory::new().room(room_id);
1975
1976 let event_id_0 = event_id!("$ev0");
1977 let event_id_1 = event_id!("$ev1");
1978 let event_id_2 = event_id!("$ev2");
1979 let event_id_3 = event_id!("$ev3");
1980
1981 let event_0 =
1982 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
1983 let event_1 =
1984 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
1985 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
1986 let event_3 =
1987 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
1988
1989 {
1992 client
1993 .event_cache_store()
1994 .lock()
1995 .await
1996 .expect("Could not acquire the event cache lock")
1997 .as_clean()
1998 .expect("Could not acquire a clean event cache lock")
1999 .handle_linked_chunk_updates(
2000 LinkedChunkId::Room(room_id),
2001 vec![
2002 Update::NewItemsChunk {
2003 previous: None,
2004 new: ChunkIdentifier::new(0),
2005 next: None,
2006 },
2007 Update::PushItems {
2008 at: Position::new(ChunkIdentifier::new(0), 0),
2009 items: vec![event_3],
2010 },
2011 Update::NewItemsChunk {
2012 previous: Some(ChunkIdentifier::new(0)),
2013 new: ChunkIdentifier::new(1),
2014 next: None,
2015 },
2016 Update::PushItems {
2017 at: Position::new(ChunkIdentifier::new(1), 0),
2018 items: vec![event_0, event_1, event_2],
2019 },
2020 ],
2021 )
2022 .await
2023 .unwrap();
2024 }
2025
2026 let event_cache = client.event_cache();
2027 event_cache.subscribe().unwrap();
2028
2029 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2030 let room = client.get_room(room_id).unwrap();
2031 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2032
2033 assert_matches!(
2035 room_event_cache
2036 .rfind_map_event_in_memory_by(|event| {
2037 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
2038 })
2039 .await,
2040 Ok(Some(event_id)) => {
2041 assert_eq!(event_id.as_deref(), Some(event_id_0));
2042 }
2043 );
2044
2045 assert_matches!(
2048 room_event_cache
2049 .rfind_map_event_in_memory_by(|event| {
2050 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
2051 })
2052 .await,
2053 Ok(Some(event_id)) => {
2054 assert_eq!(event_id.as_deref(), Some(event_id_2));
2055 }
2056 );
2057
2058 assert!(
2060 room_event_cache
2061 .rfind_map_event_in_memory_by(|event| {
2062 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
2063 == Some(user_id))
2064 .then(|| event.event_id())
2065 })
2066 .await
2067 .unwrap()
2068 .is_none()
2069 );
2070
2071 assert!(
2073 room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2074 );
2075 }
2076
2077 #[async_test]
2078 async fn test_reload_when_dirty() {
2079 let user_id = user_id!("@mnt_io:matrix.org");
2080 let room_id = room_id!("!raclette:patate.ch");
2081
2082 let event_cache_store = MemoryStore::new();
2084
2085 let client_p0 = MockClientBuilder::new(None)
2087 .on_builder(|builder| {
2088 builder.store_config(
2089 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2090 .event_cache_store(event_cache_store.clone()),
2091 )
2092 })
2093 .build()
2094 .await;
2095
2096 let client_p1 = MockClientBuilder::new(None)
2098 .on_builder(|builder| {
2099 builder.store_config(
2100 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2101 .event_cache_store(event_cache_store),
2102 )
2103 })
2104 .build()
2105 .await;
2106
2107 let event_factory = EventFactory::new().room(room_id).sender(user_id);
2108
2109 let ev_id_0 = event_id!("$ev_0");
2110 let ev_id_1 = event_id!("$ev_1");
2111
2112 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2113 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2114
2115 client_p0
2117 .event_cache_store()
2118 .lock()
2119 .await
2120 .expect("[p0] Could not acquire the event cache lock")
2121 .as_clean()
2122 .expect("[p0] Could not acquire a clean event cache lock")
2123 .handle_linked_chunk_updates(
2124 LinkedChunkId::Room(room_id),
2125 vec![
2126 Update::NewItemsChunk {
2127 previous: None,
2128 new: ChunkIdentifier::new(0),
2129 next: None,
2130 },
2131 Update::PushItems {
2132 at: Position::new(ChunkIdentifier::new(0), 0),
2133 items: vec![ev_0],
2134 },
2135 Update::NewItemsChunk {
2136 previous: Some(ChunkIdentifier::new(0)),
2137 new: ChunkIdentifier::new(1),
2138 next: None,
2139 },
2140 Update::PushItems {
2141 at: Position::new(ChunkIdentifier::new(1), 0),
2142 items: vec![ev_1],
2143 },
2144 ],
2145 )
2146 .await
2147 .unwrap();
2148
2149 let (room_event_cache_p0, room_event_cache_p1) = {
2151 let event_cache_p0 = client_p0.event_cache();
2152 event_cache_p0.subscribe().unwrap();
2153
2154 let event_cache_p1 = client_p1.event_cache();
2155 event_cache_p1.subscribe().unwrap();
2156
2157 client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
2158 client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
2159
2160 let (room_event_cache_p0, _drop_handles) =
2161 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2162 let (room_event_cache_p1, _drop_handles) =
2163 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2164
2165 (room_event_cache_p0, room_event_cache_p1)
2166 };
2167
2168 let mut updates_stream_p0 = {
2173 let room_event_cache = &room_event_cache_p0;
2174
2175 let (initial_updates, mut updates_stream) =
2176 room_event_cache_p0.subscribe().await.unwrap();
2177
2178 assert_eq!(initial_updates.len(), 1);
2180 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2181 assert!(updates_stream.is_empty());
2182
2183 assert!(event_loaded(room_event_cache, ev_id_1).await);
2185
2186 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2188
2189 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2191
2192 assert_matches!(
2194 updates_stream.recv().await.unwrap(),
2195 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2196 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2197 assert_matches!(
2198 &diffs[0],
2199 VectorDiff::Insert { index: 0, value: event } => {
2200 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2201 }
2202 );
2203 }
2204 );
2205
2206 assert!(event_loaded(room_event_cache, ev_id_0).await);
2208
2209 updates_stream
2210 };
2211
2212 let mut updates_stream_p1 = {
2214 let room_event_cache = &room_event_cache_p1;
2215 let (initial_updates, mut updates_stream) =
2216 room_event_cache_p1.subscribe().await.unwrap();
2217
2218 assert_eq!(initial_updates.len(), 1);
2220 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2221 assert!(updates_stream.is_empty());
2222
2223 assert!(event_loaded(room_event_cache, ev_id_1).await);
2225
2226 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2228
2229 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2231
2232 assert_matches!(
2234 updates_stream.recv().await.unwrap(),
2235 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2236 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2237 assert_matches!(
2238 &diffs[0],
2239 VectorDiff::Insert { index: 0, value: event } => {
2240 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2241 }
2242 );
2243 }
2244 );
2245
2246 assert!(event_loaded(room_event_cache, ev_id_0).await);
2248
2249 updates_stream
2250 };
2251
2252 for _ in 0..3 {
2254 {
2258 let room_event_cache = &room_event_cache_p0;
2259 let updates_stream = &mut updates_stream_p0;
2260
2261 assert!(event_loaded(room_event_cache, ev_id_1).await);
2263
2264 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2267
2268 assert_matches!(
2270 updates_stream.recv().await.unwrap(),
2271 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2272 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2273 assert_matches!(&diffs[0], VectorDiff::Clear);
2274 assert_matches!(
2275 &diffs[1],
2276 VectorDiff::Append { values: events } => {
2277 assert_eq!(events.len(), 1);
2278 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2279 }
2280 );
2281 }
2282 );
2283
2284 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2286
2287 assert!(event_loaded(room_event_cache, ev_id_0).await);
2289
2290 assert_matches!(
2292 updates_stream.recv().await.unwrap(),
2293 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2294 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2295 assert_matches!(
2296 &diffs[0],
2297 VectorDiff::Insert { index: 0, value: event } => {
2298 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2299 }
2300 );
2301 }
2302 );
2303 }
2304
2305 {
2309 let room_event_cache = &room_event_cache_p1;
2310 let updates_stream = &mut updates_stream_p1;
2311
2312 assert!(event_loaded(room_event_cache, ev_id_1).await);
2314
2315 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2318
2319 assert_matches!(
2321 updates_stream.recv().await.unwrap(),
2322 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2323 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2324 assert_matches!(&diffs[0], VectorDiff::Clear);
2325 assert_matches!(
2326 &diffs[1],
2327 VectorDiff::Append { values: events } => {
2328 assert_eq!(events.len(), 1);
2329 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2330 }
2331 );
2332 }
2333 );
2334
2335 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2337
2338 assert!(event_loaded(room_event_cache, ev_id_0).await);
2340
2341 assert_matches!(
2343 updates_stream.recv().await.unwrap(),
2344 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2345 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2346 assert_matches!(
2347 &diffs[0],
2348 VectorDiff::Insert { index: 0, value: event } => {
2349 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2350 }
2351 );
2352 }
2353 );
2354 }
2355 }
2356
2357 for _ in 0..3 {
2360 {
2361 let room_event_cache = &room_event_cache_p0;
2362 let updates_stream = &mut updates_stream_p0;
2363
2364 let guard = room_event_cache.inner.state.read().await.unwrap();
2365
2366 assert!(guard.is_dirty().not());
2372
2373 assert_matches!(
2375 updates_stream.recv().await.unwrap(),
2376 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2377 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2378 assert_matches!(&diffs[0], VectorDiff::Clear);
2379 assert_matches!(
2380 &diffs[1],
2381 VectorDiff::Append { values: events } => {
2382 assert_eq!(events.len(), 1);
2383 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2384 }
2385 );
2386 }
2387 );
2388
2389 assert!(event_loaded(room_event_cache, ev_id_1).await);
2390 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2391
2392 drop(guard);
2398
2399 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2400 assert!(event_loaded(room_event_cache, ev_id_0).await);
2401
2402 assert_matches!(
2404 updates_stream.recv().await.unwrap(),
2405 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2406 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2407 assert_matches!(
2408 &diffs[0],
2409 VectorDiff::Insert { index: 0, value: event } => {
2410 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2411 }
2412 );
2413 }
2414 );
2415 }
2416
2417 {
2418 let room_event_cache = &room_event_cache_p1;
2419 let updates_stream = &mut updates_stream_p1;
2420
2421 let guard = room_event_cache.inner.state.read().await.unwrap();
2422
2423 assert!(guard.is_dirty().not());
2428
2429 assert_matches!(
2431 updates_stream.recv().await.unwrap(),
2432 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2433 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2434 assert_matches!(&diffs[0], VectorDiff::Clear);
2435 assert_matches!(
2436 &diffs[1],
2437 VectorDiff::Append { values: events } => {
2438 assert_eq!(events.len(), 1);
2439 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2440 }
2441 );
2442 }
2443 );
2444
2445 assert!(event_loaded(room_event_cache, ev_id_1).await);
2446 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2447
2448 drop(guard);
2454
2455 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2456 assert!(event_loaded(room_event_cache, ev_id_0).await);
2457
2458 assert_matches!(
2460 updates_stream.recv().await.unwrap(),
2461 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2462 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2463 assert_matches!(
2464 &diffs[0],
2465 VectorDiff::Insert { index: 0, value: event } => {
2466 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2467 }
2468 );
2469 }
2470 );
2471 }
2472 }
2473
2474 for _ in 0..3 {
2476 {
2477 let room_event_cache = &room_event_cache_p0;
2478 let updates_stream = &mut updates_stream_p0;
2479
2480 let guard = room_event_cache.inner.state.write().await.unwrap();
2481
2482 assert!(guard.is_dirty().not());
2484
2485 assert_matches!(
2487 updates_stream.recv().await.unwrap(),
2488 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2489 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2490 assert_matches!(&diffs[0], VectorDiff::Clear);
2491 assert_matches!(
2492 &diffs[1],
2493 VectorDiff::Append { values: events } => {
2494 assert_eq!(events.len(), 1);
2495 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2496 }
2497 );
2498 }
2499 );
2500
2501 drop(guard);
2504
2505 assert!(event_loaded(room_event_cache, ev_id_1).await);
2506 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2507
2508 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2509 assert!(event_loaded(room_event_cache, ev_id_0).await);
2510
2511 assert_matches!(
2513 updates_stream.recv().await.unwrap(),
2514 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2515 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2516 assert_matches!(
2517 &diffs[0],
2518 VectorDiff::Insert { index: 0, value: event } => {
2519 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2520 }
2521 );
2522 }
2523 );
2524 }
2525
2526 {
2527 let room_event_cache = &room_event_cache_p1;
2528 let updates_stream = &mut updates_stream_p1;
2529
2530 let guard = room_event_cache.inner.state.write().await.unwrap();
2531
2532 assert!(guard.is_dirty().not());
2534
2535 assert_matches!(
2537 updates_stream.recv().await.unwrap(),
2538 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2539 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2540 assert_matches!(&diffs[0], VectorDiff::Clear);
2541 assert_matches!(
2542 &diffs[1],
2543 VectorDiff::Append { values: events } => {
2544 assert_eq!(events.len(), 1);
2545 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2546 }
2547 );
2548 }
2549 );
2550
2551 drop(guard);
2554
2555 assert!(event_loaded(room_event_cache, ev_id_1).await);
2556 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2557
2558 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2559 assert!(event_loaded(room_event_cache, ev_id_0).await);
2560
2561 assert_matches!(
2563 updates_stream.recv().await.unwrap(),
2564 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2565 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2566 assert_matches!(
2567 &diffs[0],
2568 VectorDiff::Insert { index: 0, value: event } => {
2569 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2570 }
2571 );
2572 }
2573 );
2574 }
2575 }
2576 }
2577
2578 #[async_test]
2579 async fn test_load_when_dirty() {
2580 let room_id_0 = room_id!("!raclette:patate.ch");
2581 let room_id_1 = room_id!("!morbiflette:patate.ch");
2582
2583 let event_cache_store = MemoryStore::new();
2585
2586 let client_p0 = MockClientBuilder::new(None)
2588 .on_builder(|builder| {
2589 builder.store_config(
2590 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2591 .event_cache_store(event_cache_store.clone()),
2592 )
2593 })
2594 .build()
2595 .await;
2596
2597 let client_p1 = MockClientBuilder::new(None)
2599 .on_builder(|builder| {
2600 builder.store_config(
2601 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2602 .event_cache_store(event_cache_store),
2603 )
2604 })
2605 .build()
2606 .await;
2607
2608 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2610 let event_cache_p0 = client_p0.event_cache();
2611 event_cache_p0.subscribe().unwrap();
2612
2613 let event_cache_p1 = client_p1.event_cache();
2614 event_cache_p1.subscribe().unwrap();
2615
2616 client_p0.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2617 client_p0.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2618
2619 client_p1.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2620 client_p1.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2621
2622 let (room_event_cache_0_p0, _drop_handles) =
2623 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2624 let (room_event_cache_0_p1, _drop_handles) =
2625 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2626
2627 (room_event_cache_0_p0, room_event_cache_0_p1)
2628 };
2629
2630 {
2632 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2633 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2634 }
2635
2636 let (room_event_cache_1_p0, _) =
2640 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2641
2642 {
2644 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2645 assert!(guard.is_dirty().not());
2646 }
2647
2648 }
2651
2652 #[async_test]
2653 async fn test_uniq_read_marker() {
2654 let client = MockClientBuilder::new(None).build().await;
2655 let room_id = room_id!("!galette:saucisse.bzh");
2656 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2657
2658 let event_cache = client.event_cache();
2659
2660 event_cache.subscribe().unwrap();
2661
2662 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2663 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
2664 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
2665
2666 assert!(events.is_empty());
2667
2668 let read_marker_event = Raw::from_json_string(
2670 json!({
2671 "content": {
2672 "event_id": "$crepe:saucisse.bzh"
2673 },
2674 "room_id": "!galette:saucisse.bzh",
2675 "type": "m.fully_read"
2676 })
2677 .to_string(),
2678 )
2679 .unwrap();
2680 let account_data = vec![read_marker_event; 100];
2681
2682 room_event_cache
2683 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
2684 .await
2685 .unwrap();
2686
2687 assert_matches!(
2689 stream.recv().await.unwrap(),
2690 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
2691 );
2692
2693 assert!(stream.recv().now_or_never().is_none());
2694
2695 assert!(generic_stream.recv().now_or_never().is_none());
2697 }
2698
2699 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2700 room_event_cache
2701 .rfind_map_event_in_memory_by(|event| {
2702 (event.event_id().as_deref() == Some(event_id)).then_some(())
2703 })
2704 .await
2705 .unwrap()
2706 .is_some()
2707 }
2708}