1pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21 collections::BTreeMap,
22 fmt,
23 sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28 deserialized_responses::AmbiguityChange,
29 event_cache::Event,
30 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
31};
32use ruma::{
33 EventId, OwnedEventId, OwnedRoomId, RoomId,
34 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
35 serde::Raw,
36};
37pub(super) use state::{LockedRoomEventCacheState, RoomEventCacheStateLockWriteGuard};
38pub use subscriber::RoomEventCacheSubscriber;
39use tokio::sync::{Notify, broadcast::Receiver, mpsc};
40use tracing::{instrument, trace, warn};
41pub use updates::{
42 RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
43 RoomEventCacheUpdateSender,
44};
45
46use super::{
47 super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination},
48 TimelineVectorDiffs,
49 event_linked_chunk::sort_positions_descending,
50 thread::pagination::ThreadPagination,
51};
52use crate::{
53 client::WeakClient,
54 event_cache::{
55 EventFocusThreadMode,
56 caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus},
57 },
58 room::WeakRoom,
59};
60
61#[derive(Clone)]
65pub struct RoomEventCache {
66 inner: Arc<RoomEventCacheInner>,
67}
68
69impl fmt::Debug for RoomEventCache {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("RoomEventCache").finish_non_exhaustive()
72 }
73}
74
75impl RoomEventCache {
76 pub(super) fn new(
78 room_id: OwnedRoomId,
79 weak_room: WeakRoom,
80 state: LockedRoomEventCacheState,
81 shared_pagination_status: SharedObservable<SharedPaginationStatus>,
82 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
83 update_sender: RoomEventCacheUpdateSender,
84 ) -> Self {
85 Self {
86 inner: Arc::new(RoomEventCacheInner::new(
87 room_id,
88 weak_room,
89 state,
90 shared_pagination_status,
91 auto_shrink_sender,
92 update_sender,
93 )),
94 }
95 }
96
97 pub fn room_id(&self) -> &RoomId {
99 &self.inner.room_id
100 }
101
102 pub async fn events(&self) -> Result<Vec<Event>> {
107 let state = self.inner.state.read().await?;
108
109 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
110 }
111
112 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
119 let state = self.inner.state.read().await?;
120 let events =
121 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
122
123 let subscriber_count = state.subscriber_count();
124 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
125 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
126
127 let subscriber = RoomEventCacheSubscriber::new(
128 self.inner.update_sender.new_room_receiver(),
129 self.inner.room_id.clone(),
130 self.inner.auto_shrink_sender.clone(),
131 subscriber_count.clone(),
132 );
133
134 Ok((events, subscriber))
135 }
136
137 pub async fn subscribe_to_thread(
140 &self,
141 thread_root: OwnedEventId,
142 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
143 let mut state = self.inner.state.write().await?;
144
145 state.subscribe_to_thread(thread_root).await
146 }
147
148 pub async fn subscribe_to_pinned_events(
158 &self,
159 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
160 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
161 let state = self.inner.state.read().await?;
162
163 state.subscribe_to_pinned_events(room).await
164 }
165
166 #[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))]
180 pub async fn get_or_create_event_focused_cache(
181 &self,
182 event_id: OwnedEventId,
183 num_context_events: u16,
184 thread_mode: EventFocusThreadMode,
185 ) -> Result<EventFocusedCache> {
186 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
187 let guard = self.inner.state.read().await?;
188
189 if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
191 trace!("the cache was already created, returning it");
192 return Ok(cache);
193 }
194
195 let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone();
197
198 drop(guard);
202
203 let room_id = room.room_id().to_owned();
204 let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone());
205
206 trace!("creating a fresh event-focused cache");
207 let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender);
208
209 cache.start_from(room, num_context_events, thread_mode).await?;
211
212 let mut guard = self.inner.state.write().await?;
213
214 if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
217 trace!("another cache has been racily created, returning it");
218 return Ok(cache);
219 }
220
221 guard.insert_event_focused_cache(event_id, thread_mode, cache.clone());
223
224 Ok(cache)
225 }
226
227 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234 pub async fn get_event_focused_cache(
235 &self,
236 event_id: OwnedEventId,
237 thread_mode: EventFocusThreadMode,
238 ) -> Result<Option<EventFocusedCache>> {
239 Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode))
240 }
241
242 pub fn pagination(&self) -> RoomPagination {
245 RoomPagination::new(self.inner.clone())
246 }
247
248 pub async fn thread_pagination(&self, thread_id: OwnedEventId) -> Result<ThreadPagination> {
251 Ok(self.inner.state.write().await?.get_or_reload_thread(thread_id).pagination())
252 }
253
254 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
262 where
263 P: FnMut(&Event) -> Option<O>,
264 {
265 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
266 }
267
268 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
273 Ok(self
274 .inner
275 .state
276 .read()
277 .await?
278 .find_event(event_id)
279 .await
280 .ok()
281 .flatten()
282 .map(|(_loc, event)| event))
283 }
284
285 pub async fn find_event_with_relations(
297 &self,
298 event_id: &EventId,
299 filter: Option<Vec<RelationType>>,
300 ) -> Result<Option<(Event, Vec<Event>)>> {
301 Ok(self
303 .inner
304 .state
305 .read()
306 .await?
307 .find_event_with_relations(event_id, filter.clone())
308 .await
309 .ok()
310 .flatten())
311 }
312
313 pub async fn find_event_relations(
325 &self,
326 event_id: &EventId,
327 filter: Option<Vec<RelationType>>,
328 ) -> Result<Vec<Event>> {
329 self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
331 }
332
333 pub async fn clear(&self) -> Result<()> {
338 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
340
341 self.inner.update_sender.send(
343 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
344 diffs: updates_as_vector_diffs,
345 origin: EventsOrigin::Cache,
346 }),
347 Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
348 );
349
350 Ok(())
351 }
352
353 pub(in super::super) fn state(&self) -> &LockedRoomEventCacheState {
355 &self.inner.state
356 }
357
358 #[instrument(skip_all, fields(room_id = %self.room_id()))]
360 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
361 self.inner
362 .handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes)
363 .await?;
364 self.inner.handle_account_data(updates.account_data);
365
366 Ok(())
367 }
368
369 #[instrument(skip_all, fields(room_id = %self.room_id()))]
371 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
372 self.inner.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
373
374 Ok(())
375 }
376
377 pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
379 &self.inner.update_sender
380 }
381
382 pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
384 self.inner.insert_sent_event_from_send_queue(event).await
385 }
386
387 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
390 match self.inner.state.write().await {
391 Ok(mut state_guard) => {
392 if let Err(err) = state_guard.save_events(events).await {
393 warn!("couldn't save event in the event cache: {err}");
394 }
395 }
396
397 Err(err) => {
398 warn!("couldn't save event in the event cache: {err}");
399 }
400 }
401 }
402
403 pub async fn debug_string(&self) -> Vec<String> {
406 match self.inner.state.read().await {
407 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
408 Err(err) => {
409 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
410
411 vec![]
412 }
413 }
414 }
415}
416
417pub(super) struct RoomEventCacheInner {
419 room_id: OwnedRoomId,
421
422 pub weak_room: WeakRoom,
423
424 pub state: LockedRoomEventCacheState,
426
427 pub pagination_batch_token_notifier: Notify,
429
430 pub shared_pagination_status: SharedObservable<SharedPaginationStatus>,
431
432 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
437
438 update_sender: RoomEventCacheUpdateSender,
440}
441
442impl RoomEventCacheInner {
443 fn new(
446 room_id: OwnedRoomId,
447 weak_room: WeakRoom,
448 state: LockedRoomEventCacheState,
449 shared_pagination_status: SharedObservable<SharedPaginationStatus>,
450 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
451 update_sender: RoomEventCacheUpdateSender,
452 ) -> Self {
453 Self {
454 room_id,
455 weak_room,
456 state,
457 update_sender,
458 pagination_batch_token_notifier: Default::default(),
459 auto_shrink_sender,
460 shared_pagination_status,
461 }
462 }
463
464 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
465 if account_data.is_empty() {
466 return;
467 }
468
469 let mut handled_read_marker = false;
470
471 trace!("Handling account data");
472
473 for raw_event in account_data {
474 match raw_event.deserialize() {
475 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
476 if handled_read_marker {
479 continue;
480 }
481
482 handled_read_marker = true;
483
484 self.update_sender.send(
486 RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
487 None,
488 );
489 }
490
491 Ok(_) => {
492 }
495
496 Err(e) => {
497 let event_type = raw_event.get_field::<String>("type").ok().flatten();
498 warn!(event_type, "Failed to deserialize account data: {e}");
499 }
500 }
501 }
502 }
503
504 async fn handle_timeline(
507 &self,
508 timeline: Timeline,
509 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
510 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
511 ) -> Result<()> {
512 self.handle_timeline_inner(
513 self.state.write().await?,
514 timeline,
515 ephemeral_events,
516 ambiguity_changes,
517 )
518 .await
519 }
520
521 async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
525 let state = self.state.write().await?;
526
527 if state.room_linked_chunk().events().next().is_some() {
531 return self
532 .handle_timeline_inner(
533 state,
534 Timeline { limited: false, prev_batch: None, events: vec![event] },
535 Vec::new(),
536 BTreeMap::new(),
537 )
538 .await;
539 }
540
541 Ok(())
542 }
543
544 async fn handle_timeline_inner(
545 &self,
546 mut state: RoomEventCacheStateLockWriteGuard<'_>,
547 timeline: Timeline,
548 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
549 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
550 ) -> Result<()> {
551 if timeline.events.is_empty()
552 && timeline.prev_batch.is_none()
553 && ephemeral_events.is_empty()
554 && ambiguity_changes.is_empty()
555 {
556 return Ok(());
557 }
558
559 trace!("adding new events");
561
562 let (stored_prev_batch_token, timeline_event_diffs) =
563 state.handle_sync(timeline, &ephemeral_events).await?;
564
565 drop(state);
566
567 if stored_prev_batch_token {
570 self.pagination_batch_token_notifier.notify_one();
571 }
572
573 if !timeline_event_diffs.is_empty() {
576 self.update_sender.send(
577 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
578 diffs: timeline_event_diffs,
579 origin: EventsOrigin::Sync,
580 }),
581 Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
582 );
583 }
584
585 if !ephemeral_events.is_empty() {
586 self.update_sender
587 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
588 }
589
590 if !ambiguity_changes.is_empty() {
591 self.update_sender
592 .send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }, None);
593 }
594
595 Ok(())
596 }
597}
598
599#[derive(Clone, Copy)]
600pub(in super::super) enum PostProcessingOrigin {
601 Sync,
602 Backpagination,
603 #[cfg(feature = "e2e-encryption")]
604 Redecryption,
605}
606
607#[cfg(test)]
608mod tests {
609 use matrix_sdk_base::{RoomState, event_cache::Event};
610 use matrix_sdk_test::{async_test, event_factory::EventFactory};
611 use ruma::{
612 RoomId, event_id,
613 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
614 room_id, user_id,
615 };
616
617 use crate::test_utils::logged_in_client;
618
619 #[async_test]
620 async fn test_find_event_by_id_with_edit_relation() {
621 let original_id = event_id!("$original");
622 let related_id = event_id!("$related");
623 let room_id = room_id!("!galette:saucisse.bzh");
624 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
625
626 assert_relations(
627 room_id,
628 f.text_msg("Original event").event_id(original_id).into(),
629 f.text_msg("* An edited event")
630 .edit(
631 original_id,
632 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
633 )
634 .event_id(related_id)
635 .into(),
636 f,
637 )
638 .await;
639 }
640
641 #[async_test]
642 async fn test_find_event_by_id_with_thread_reply_relation() {
643 let original_id = event_id!("$original");
644 let related_id = event_id!("$related");
645 let room_id = room_id!("!galette:saucisse.bzh");
646 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
647
648 assert_relations(
649 room_id,
650 f.text_msg("Original event").event_id(original_id).into(),
651 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
652 f,
653 )
654 .await;
655 }
656
657 #[async_test]
658 async fn test_find_event_by_id_with_reaction_relation() {
659 let original_id = event_id!("$original");
660 let related_id = event_id!("$related");
661 let room_id = room_id!("!galette:saucisse.bzh");
662 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
663
664 assert_relations(
665 room_id,
666 f.text_msg("Original event").event_id(original_id).into(),
667 f.reaction(original_id, ":D").event_id(related_id).into(),
668 f,
669 )
670 .await;
671 }
672
673 #[async_test]
674 async fn test_find_event_by_id_with_poll_response_relation() {
675 let original_id = event_id!("$original");
676 let related_id = event_id!("$related");
677 let room_id = room_id!("!galette:saucisse.bzh");
678 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
679
680 assert_relations(
681 room_id,
682 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
683 .event_id(original_id)
684 .into(),
685 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
686 f,
687 )
688 .await;
689 }
690
691 #[async_test]
692 async fn test_find_event_by_id_with_poll_end_relation() {
693 let original_id = event_id!("$original");
694 let related_id = event_id!("$related");
695 let room_id = room_id!("!galette:saucisse.bzh");
696 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
697
698 assert_relations(
699 room_id,
700 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
701 .event_id(original_id)
702 .into(),
703 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
704 f,
705 )
706 .await;
707 }
708
709 #[async_test]
710 async fn test_find_event_by_id_with_filtered_relationships() {
711 let original_id = event_id!("$original");
712 let related_id = event_id!("$related");
713 let associated_related_id = event_id!("$recursive_related");
714 let room_id = room_id!("!galette:saucisse.bzh");
715 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
716
717 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
718 let related_event = event_factory
719 .text_msg("* Edited event")
720 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
721 .event_id(related_id)
722 .into();
723 let associated_related_event =
724 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
725
726 let client = logged_in_client(None).await;
727
728 let event_cache = client.event_cache();
729 event_cache.subscribe().unwrap();
730
731 client.base_client().get_or_create_room(room_id, RoomState::Joined);
732 let room = client.get_room(room_id).unwrap();
733
734 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
735
736 room_event_cache.save_events([original_event]).await;
738
739 room_event_cache.save_events([related_event]).await;
741
742 room_event_cache.save_events([associated_related_event]).await;
744
745 let filter = Some(vec![RelationType::Replacement]);
746 let (event, related_events) = room_event_cache
747 .find_event_with_relations(original_id, filter)
748 .await
749 .expect("Failed to find the event with relations")
750 .expect("Event has no relation");
751 let cached_event_id = event.event_id().unwrap();
753 assert_eq!(cached_event_id, original_id);
754
755 assert_eq!(related_events.len(), 1);
757
758 let related_event_id = related_events[0].event_id().unwrap();
759 assert_eq!(related_event_id, related_id);
760
761 let filter = Some(vec![RelationType::Thread]);
763 let (event, related_events) = room_event_cache
764 .find_event_with_relations(original_id, filter)
765 .await
766 .expect("Failed to find the event with relations")
767 .expect("Event has no relation");
768
769 let cached_event_id = event.event_id().unwrap();
771 assert_eq!(cached_event_id, original_id);
772 assert!(related_events.is_empty());
774 }
775
776 #[async_test]
777 async fn test_find_event_by_id_with_recursive_relation() {
778 let original_id = event_id!("$original");
779 let related_id = event_id!("$related");
780 let associated_related_id = event_id!("$recursive_related");
781 let room_id = room_id!("!galette:saucisse.bzh");
782 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
783
784 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
785 let related_event = event_factory
786 .text_msg("* Edited event")
787 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
788 .event_id(related_id)
789 .into();
790 let associated_related_event =
791 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
792
793 let client = logged_in_client(None).await;
794
795 let event_cache = client.event_cache();
796 event_cache.subscribe().unwrap();
797
798 client.base_client().get_or_create_room(room_id, RoomState::Joined);
799 let room = client.get_room(room_id).unwrap();
800
801 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
802
803 room_event_cache.save_events([original_event]).await;
805
806 room_event_cache.save_events([related_event]).await;
808
809 room_event_cache.save_events([associated_related_event]).await;
811
812 let (event, related_events) = room_event_cache
813 .find_event_with_relations(original_id, None)
814 .await
815 .expect("Failed to find the event with relations")
816 .expect("Event has no relation");
817 let cached_event_id = event.event_id().unwrap();
819 assert_eq!(cached_event_id, original_id);
820
821 assert_eq!(related_events.len(), 2);
823
824 let related_event_id = related_events[0].event_id().unwrap();
825 assert_eq!(related_event_id, related_id);
826 let related_event_id = related_events[1].event_id().unwrap();
827 assert_eq!(related_event_id, associated_related_id);
828 }
829
830 async fn assert_relations(
831 room_id: &RoomId,
832 original_event: Event,
833 related_event: Event,
834 event_factory: EventFactory,
835 ) {
836 let client = logged_in_client(None).await;
837
838 let event_cache = client.event_cache();
839 event_cache.subscribe().unwrap();
840
841 client.base_client().get_or_create_room(room_id, RoomState::Joined);
842 let room = client.get_room(room_id).unwrap();
843
844 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
845
846 let original_event_id = original_event.event_id().unwrap();
848 room_event_cache.save_events([original_event]).await;
849
850 let unrelated_id = event_id!("$2");
852 room_event_cache
853 .save_events([event_factory
854 .text_msg("An unrelated event")
855 .event_id(unrelated_id)
856 .into()])
857 .await;
858
859 let related_id = related_event.event_id().unwrap();
861 room_event_cache.save_events([related_event]).await;
862
863 let (event, related_events) = room_event_cache
864 .find_event_with_relations(&original_event_id, None)
865 .await
866 .expect("Failed to find the event with relations")
867 .expect("Event has no relation");
868 let cached_event_id = event.event_id().unwrap();
870 assert_eq!(cached_event_id, original_event_id);
871
872 let related_event_id = related_events[0].event_id().unwrap();
874 assert_eq!(related_event_id, related_id);
875 }
876}
877
878#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
880 use std::{ops::Not, sync::Arc};
881
882 use assert_matches::assert_matches;
883 use assert_matches2::assert_let;
884 use eyeball_im::VectorDiff;
885 use futures_util::FutureExt;
886 use matrix_sdk_base::{
887 RoomState,
888 event_cache::{
889 Gap,
890 store::{EventCacheStore as _, MemoryStore},
891 },
892 linked_chunk::{
893 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
894 lazy_loader::from_all_chunks,
895 },
896 store::StoreConfig,
897 sync::{JoinedRoomUpdate, Timeline},
898 };
899 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
900 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
901 use ruma::{
902 EventId, event_id,
903 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
904 room_id,
905 serde::Raw,
906 user_id,
907 };
908 use serde_json::json;
909 use tokio::task::yield_now;
910
911 use super::{
912 super::{
913 super::TimelineVectorDiffs, lock::Reload as _,
914 pagination::LoadMoreEventsBackwardsOutcome,
915 },
916 RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
917 };
918 use crate::{assert_let_timeout, test_utils::client::MockClientBuilder};
919
920 #[async_test]
921 async fn test_write_to_storage() {
922 let room_id = room_id!("!galette:saucisse.bzh");
923 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
924
925 let event_cache_store = Arc::new(MemoryStore::new());
926
927 let client = MockClientBuilder::new(None)
928 .on_builder(|builder| {
929 builder.store_config(
930 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
931 .event_cache_store(event_cache_store.clone()),
932 )
933 })
934 .build()
935 .await;
936
937 let event_cache = client.event_cache();
938
939 event_cache.subscribe().unwrap();
941
942 client.base_client().get_or_create_room(room_id, RoomState::Joined);
943 let room = client.get_room(room_id).unwrap();
944
945 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
946 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
947
948 let timeline = Timeline {
950 limited: true,
951 prev_batch: Some("raclette".to_owned()),
952 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
953 };
954
955 room_event_cache
956 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
957 .await
958 .unwrap();
959
960 assert_matches!(
962 generic_stream.recv().await,
963 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
964 assert_eq!(expected_room_id, room_id);
965 }
966 );
967 assert!(generic_stream.is_empty());
968
969 let linked_chunk = from_all_chunks::<3, _, _>(
971 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
972 )
973 .unwrap()
974 .unwrap();
975
976 assert_eq!(linked_chunk.chunks().count(), 2);
977
978 let mut chunks = linked_chunk.chunks();
979
980 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
982 assert_eq!(gap.token, "raclette");
983 });
984
985 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
987 assert_eq!(events.len(), 1);
988 let deserialized = events[0].raw().deserialize().unwrap();
989 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
990 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
991 });
992
993 assert!(chunks.next().is_none());
995 }
996
997 #[async_test]
998 async fn test_write_to_storage_strips_bundled_relations() {
999 let room_id = room_id!("!galette:saucisse.bzh");
1000 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1001
1002 let event_cache_store = Arc::new(MemoryStore::new());
1003
1004 let client = MockClientBuilder::new(None)
1005 .on_builder(|builder| {
1006 builder.store_config(
1007 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1008 .event_cache_store(event_cache_store.clone()),
1009 )
1010 })
1011 .build()
1012 .await;
1013
1014 let event_cache = client.event_cache();
1015
1016 event_cache.subscribe().unwrap();
1018
1019 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1020 let room = client.get_room(room_id).unwrap();
1021
1022 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1023 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1024
1025 let ev = f
1027 .text_msg("hey yo")
1028 .sender(*ALICE)
1029 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
1030 .into_event();
1031
1032 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1033
1034 room_event_cache
1035 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1036 .await
1037 .unwrap();
1038
1039 assert_matches!(
1041 generic_stream.recv().await,
1042 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1043 assert_eq!(expected_room_id, room_id);
1044 }
1045 );
1046 assert!(generic_stream.is_empty());
1047
1048 {
1050 let events = room_event_cache.events().await.unwrap();
1051
1052 assert_eq!(events.len(), 1);
1053
1054 let ev = events[0].raw().deserialize().unwrap();
1055 assert_let!(
1056 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1057 );
1058
1059 let original = msg.as_original().unwrap();
1060 assert_eq!(original.content.body(), "hey yo");
1061 assert!(original.unsigned.relations.replace.is_some());
1062 }
1063
1064 let linked_chunk = from_all_chunks::<3, _, _>(
1066 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1067 )
1068 .unwrap()
1069 .unwrap();
1070
1071 assert_eq!(linked_chunk.chunks().count(), 1);
1072
1073 let mut chunks = linked_chunk.chunks();
1074 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1075 assert_eq!(events.len(), 1);
1076
1077 let ev = events[0].raw().deserialize().unwrap();
1078 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1079
1080 let original = msg.as_original().unwrap();
1081 assert_eq!(original.content.body(), "hey yo");
1082 assert!(original.unsigned.relations.replace.is_none());
1083 });
1084
1085 assert!(chunks.next().is_none());
1087 }
1088
1089 #[async_test]
1090 async fn test_clear() {
1091 let room_id = room_id!("!galette:saucisse.bzh");
1092 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1093
1094 let event_cache_store = Arc::new(MemoryStore::new());
1095
1096 let event_id1 = event_id!("$1");
1097 let event_id2 = event_id!("$2");
1098
1099 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1100 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1101
1102 event_cache_store
1104 .handle_linked_chunk_updates(
1105 LinkedChunkId::Room(room_id),
1106 vec![
1107 Update::NewItemsChunk {
1109 previous: None,
1110 new: ChunkIdentifier::new(0),
1111 next: None,
1112 },
1113 Update::NewGapChunk {
1115 previous: Some(ChunkIdentifier::new(0)),
1116 new: ChunkIdentifier::new(42),
1118 next: None,
1119 gap: Gap { token: "comté".to_owned() },
1120 },
1121 Update::NewItemsChunk {
1123 previous: Some(ChunkIdentifier::new(42)),
1124 new: ChunkIdentifier::new(1),
1125 next: None,
1126 },
1127 Update::PushItems {
1128 at: Position::new(ChunkIdentifier::new(1), 0),
1129 items: vec![ev1.clone()],
1130 },
1131 Update::NewItemsChunk {
1133 previous: Some(ChunkIdentifier::new(1)),
1134 new: ChunkIdentifier::new(2),
1135 next: None,
1136 },
1137 Update::PushItems {
1138 at: Position::new(ChunkIdentifier::new(2), 0),
1139 items: vec![ev2.clone()],
1140 },
1141 ],
1142 )
1143 .await
1144 .unwrap();
1145
1146 let client = MockClientBuilder::new(None)
1147 .on_builder(|builder| {
1148 builder.store_config(
1149 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1150 .event_cache_store(event_cache_store.clone()),
1151 )
1152 })
1153 .build()
1154 .await;
1155
1156 let event_cache = client.event_cache();
1157
1158 event_cache.subscribe().unwrap();
1160
1161 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1162 let room = client.get_room(room_id).unwrap();
1163
1164 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1165
1166 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1167 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1168
1169 {
1171 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1172 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1173 }
1174
1175 {
1177 assert_eq!(items.len(), 1);
1179 assert_eq!(items[0].event_id().unwrap(), event_id2);
1180
1181 assert!(stream.is_empty());
1182 }
1183
1184 {
1186 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1187
1188 assert_let_timeout!(
1189 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1190 stream.recv()
1191 );
1192 assert_eq!(diffs.len(), 1);
1193 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1194 assert_eq!(event.event_id().unwrap(), event_id1);
1196 });
1197
1198 assert!(stream.is_empty());
1199
1200 assert_let_timeout!(
1201 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1202 generic_stream.recv()
1203 );
1204 assert_eq!(room_id, expected_room_id);
1205 assert!(generic_stream.is_empty());
1206 }
1207
1208 room_event_cache.clear().await.unwrap();
1210
1211 assert_let_timeout!(
1213 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1214 stream.recv()
1215 );
1216 assert_eq!(diffs.len(), 1);
1217 assert_let!(VectorDiff::Clear = &diffs[0]);
1218
1219 assert_let_timeout!(
1221 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1222 );
1223 assert_eq!(received_room_id, room_id);
1224 assert!(generic_stream.is_empty());
1225
1226 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1229
1230 let items = room_event_cache.events().await.unwrap();
1232 assert!(items.is_empty());
1233
1234 let linked_chunk = from_all_chunks::<3, _, _>(
1236 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1237 )
1238 .unwrap()
1239 .unwrap();
1240
1241 assert_eq!(linked_chunk.num_items(), 0);
1245 }
1246
1247 #[async_test]
1248 async fn test_load_from_storage() {
1249 let room_id = room_id!("!galette:saucisse.bzh");
1250 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1251
1252 let event_cache_store = Arc::new(MemoryStore::new());
1253
1254 let event_id1 = event_id!("$1");
1255 let event_id2 = event_id!("$2");
1256
1257 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1258 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1259
1260 event_cache_store
1262 .handle_linked_chunk_updates(
1263 LinkedChunkId::Room(room_id),
1264 vec![
1265 Update::NewItemsChunk {
1267 previous: None,
1268 new: ChunkIdentifier::new(0),
1269 next: None,
1270 },
1271 Update::NewGapChunk {
1273 previous: Some(ChunkIdentifier::new(0)),
1274 new: ChunkIdentifier::new(42),
1276 next: None,
1277 gap: Gap { token: "cheddar".to_owned() },
1278 },
1279 Update::NewItemsChunk {
1281 previous: Some(ChunkIdentifier::new(42)),
1282 new: ChunkIdentifier::new(1),
1283 next: None,
1284 },
1285 Update::PushItems {
1286 at: Position::new(ChunkIdentifier::new(1), 0),
1287 items: vec![ev1.clone()],
1288 },
1289 Update::NewItemsChunk {
1291 previous: Some(ChunkIdentifier::new(1)),
1292 new: ChunkIdentifier::new(2),
1293 next: None,
1294 },
1295 Update::PushItems {
1296 at: Position::new(ChunkIdentifier::new(2), 0),
1297 items: vec![ev2.clone()],
1298 },
1299 ],
1300 )
1301 .await
1302 .unwrap();
1303
1304 let client = MockClientBuilder::new(None)
1305 .on_builder(|builder| {
1306 builder.store_config(
1307 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1308 .event_cache_store(event_cache_store.clone()),
1309 )
1310 })
1311 .build()
1312 .await;
1313
1314 let event_cache = client.event_cache();
1315
1316 event_cache.subscribe().unwrap();
1318
1319 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1321
1322 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1323 let room = client.get_room(room_id).unwrap();
1324
1325 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1326
1327 assert_matches!(
1330 generic_stream.recv().await,
1331 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1332 assert_eq!(room_id, expected_room_id);
1333 }
1334 );
1335 assert!(generic_stream.is_empty());
1336
1337 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1338
1339 assert_eq!(items.len(), 1);
1342 assert_eq!(items[0].event_id().unwrap(), event_id2);
1343 assert!(stream.is_empty());
1344
1345 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1347 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1348
1349 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1351
1352 assert_let_timeout!(
1353 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1354 stream.recv()
1355 );
1356 assert_eq!(diffs.len(), 1);
1357 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1358 assert_eq!(event.event_id().unwrap(), event_id1);
1359 });
1360
1361 assert!(stream.is_empty());
1362
1363 assert_matches!(
1365 generic_stream.recv().await,
1366 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1367 assert_eq!(expected_room_id, room_id);
1368 }
1369 );
1370 assert!(generic_stream.is_empty());
1371
1372 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1374
1375 room_event_cache
1376 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1377 .await
1378 .unwrap();
1379
1380 assert!(generic_stream.recv().now_or_never().is_none());
1383
1384 let items = room_event_cache.events().await.unwrap();
1389 assert_eq!(items.len(), 2);
1390 assert_eq!(items[0].event_id().unwrap(), event_id1);
1391 assert_eq!(items[1].event_id().unwrap(), event_id2);
1392 }
1393
1394 #[async_test]
1395 async fn test_load_from_storage_resilient_to_failure() {
1396 let room_id = room_id!("!fondue:patate.ch");
1397 let event_cache_store = Arc::new(MemoryStore::new());
1398
1399 let event = EventFactory::new()
1400 .room(room_id)
1401 .sender(user_id!("@ben:saucisse.bzh"))
1402 .text_msg("foo")
1403 .event_id(event_id!("$42"))
1404 .into_event();
1405
1406 event_cache_store
1408 .handle_linked_chunk_updates(
1409 LinkedChunkId::Room(room_id),
1410 vec![
1411 Update::NewItemsChunk {
1412 previous: None,
1413 new: ChunkIdentifier::new(0),
1414 next: None,
1415 },
1416 Update::PushItems {
1417 at: Position::new(ChunkIdentifier::new(0), 0),
1418 items: vec![event],
1419 },
1420 Update::NewItemsChunk {
1421 previous: Some(ChunkIdentifier::new(0)),
1422 new: ChunkIdentifier::new(1),
1423 next: Some(ChunkIdentifier::new(0)),
1424 },
1425 ],
1426 )
1427 .await
1428 .unwrap();
1429
1430 let client = MockClientBuilder::new(None)
1431 .on_builder(|builder| {
1432 builder.store_config(
1433 StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1434 .event_cache_store(event_cache_store.clone()),
1435 )
1436 })
1437 .build()
1438 .await;
1439
1440 let event_cache = client.event_cache();
1441
1442 event_cache.subscribe().unwrap();
1444
1445 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1446 let room = client.get_room(room_id).unwrap();
1447
1448 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1449
1450 let items = room_event_cache.events().await.unwrap();
1451
1452 assert!(items.is_empty());
1455
1456 let raw_chunks =
1459 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1460 assert!(raw_chunks.is_empty());
1461 }
1462
1463 #[async_test]
1464 async fn test_no_useless_gaps() {
1465 let room_id = room_id!("!galette:saucisse.bzh");
1466
1467 let client = MockClientBuilder::new(None).build().await;
1468
1469 let event_cache = client.event_cache();
1470 event_cache.subscribe().unwrap();
1471
1472 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1473 let room = client.get_room(room_id).unwrap();
1474 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1475 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1476
1477 let f = EventFactory::new().room(room_id).sender(*ALICE);
1478
1479 room_event_cache
1482 .handle_joined_room_update(JoinedRoomUpdate {
1483 timeline: Timeline {
1484 limited: true,
1485 prev_batch: Some("raclette".to_owned()),
1486 events: vec![f.text_msg("hey yo").into_event()],
1487 },
1488 ..Default::default()
1489 })
1490 .await
1491 .unwrap();
1492
1493 assert_matches!(
1495 generic_stream.recv().await,
1496 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1497 assert_eq!(expected_room_id, room_id);
1498 }
1499 );
1500 assert!(generic_stream.is_empty());
1501
1502 {
1503 let state = room_event_cache.inner.state.read().await.unwrap();
1504
1505 let mut num_gaps = 0;
1506 let mut num_events = 0;
1507
1508 for c in state.room_linked_chunk().chunks() {
1509 match c.content() {
1510 ChunkContent::Items(items) => num_events += items.len(),
1511 ChunkContent::Gap(_) => num_gaps += 1,
1512 }
1513 }
1514
1515 assert_eq!(num_gaps, 0);
1518 assert_eq!(num_events, 1);
1519 }
1520
1521 assert_matches!(
1523 room_event_cache.pagination().load_more_events_backwards().await.unwrap(),
1524 LoadMoreEventsBackwardsOutcome::Gap { .. }
1525 );
1526
1527 {
1528 let state = room_event_cache.inner.state.read().await.unwrap();
1529
1530 let mut num_gaps = 0;
1531 let mut num_events = 0;
1532
1533 for c in state.room_linked_chunk().chunks() {
1534 match c.content() {
1535 ChunkContent::Items(items) => num_events += items.len(),
1536 ChunkContent::Gap(_) => num_gaps += 1,
1537 }
1538 }
1539
1540 assert_eq!(num_gaps, 1);
1542 assert_eq!(num_events, 1);
1543 }
1544
1545 room_event_cache
1548 .handle_joined_room_update(JoinedRoomUpdate {
1549 timeline: Timeline {
1550 limited: false,
1551 prev_batch: Some("fondue".to_owned()),
1552 events: vec![f.text_msg("sup").into_event()],
1553 },
1554 ..Default::default()
1555 })
1556 .await
1557 .unwrap();
1558
1559 assert_matches!(
1561 generic_stream.recv().await,
1562 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1563 assert_eq!(expected_room_id, room_id);
1564 }
1565 );
1566 assert!(generic_stream.is_empty());
1567
1568 {
1569 let state = room_event_cache.inner.state.read().await.unwrap();
1570
1571 let mut num_gaps = 0;
1572 let mut num_events = 0;
1573
1574 for c in state.room_linked_chunk().chunks() {
1575 match c.content() {
1576 ChunkContent::Items(items) => num_events += items.len(),
1577 ChunkContent::Gap(gap) => {
1578 assert_eq!(gap.token, "raclette");
1579 num_gaps += 1;
1580 }
1581 }
1582 }
1583
1584 assert_eq!(num_gaps, 1);
1586 assert_eq!(num_events, 2);
1587 }
1588 }
1589
1590 #[async_test]
1591 async fn test_shrink_to_last_chunk() {
1592 let room_id = room_id!("!galette:saucisse.bzh");
1593
1594 let client = MockClientBuilder::new(None).build().await;
1595
1596 let f = EventFactory::new().room(room_id);
1597
1598 let evid1 = event_id!("$1");
1599 let evid2 = event_id!("$2");
1600
1601 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1602 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1603
1604 {
1606 client
1607 .event_cache_store()
1608 .lock()
1609 .await
1610 .expect("Could not acquire the event cache lock")
1611 .as_clean()
1612 .expect("Could not acquire a clean event cache lock")
1613 .handle_linked_chunk_updates(
1614 LinkedChunkId::Room(room_id),
1615 vec![
1616 Update::NewItemsChunk {
1617 previous: None,
1618 new: ChunkIdentifier::new(0),
1619 next: None,
1620 },
1621 Update::PushItems {
1622 at: Position::new(ChunkIdentifier::new(0), 0),
1623 items: vec![ev1],
1624 },
1625 Update::NewItemsChunk {
1626 previous: Some(ChunkIdentifier::new(0)),
1627 new: ChunkIdentifier::new(1),
1628 next: None,
1629 },
1630 Update::PushItems {
1631 at: Position::new(ChunkIdentifier::new(1), 0),
1632 items: vec![ev2],
1633 },
1634 ],
1635 )
1636 .await
1637 .unwrap();
1638 }
1639
1640 let event_cache = client.event_cache();
1641 event_cache.subscribe().unwrap();
1642
1643 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1644 let room = client.get_room(room_id).unwrap();
1645 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1646
1647 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1649 assert_eq!(events.len(), 1);
1650 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1651 assert!(stream.is_empty());
1652
1653 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1654
1655 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1657 assert_eq!(outcome.events.len(), 1);
1658 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1659 assert!(outcome.reached_start);
1660
1661 assert_let_timeout!(
1663 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1664 stream.recv()
1665 );
1666 assert_eq!(diffs.len(), 1);
1667 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1668 assert_eq!(value.event_id().as_deref(), Some(evid1));
1669 });
1670
1671 assert!(stream.is_empty());
1672
1673 assert_let_timeout!(
1675 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1676 );
1677 assert_eq!(expected_room_id, room_id);
1678 assert!(generic_stream.is_empty());
1679
1680 room_event_cache
1682 .inner
1683 .state
1684 .write()
1685 .await
1686 .unwrap()
1687 .reload()
1688 .await
1689 .expect("shrinking should succeed");
1690
1691 assert_let_timeout!(
1693 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1694 stream.recv()
1695 );
1696 assert_eq!(diffs.len(), 2);
1697 assert_matches!(&diffs[0], VectorDiff::Clear);
1698 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1699 assert_eq!(values.len(), 1);
1700 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1701 });
1702
1703 assert!(stream.is_empty());
1704
1705 assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1707 assert!(generic_stream.is_empty());
1708
1709 let events = room_event_cache.events().await.unwrap();
1711 assert_eq!(events.len(), 1);
1712 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1713
1714 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1717 assert_eq!(outcome.events.len(), 1);
1718 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1719 assert!(outcome.reached_start);
1720 }
1721
1722 #[async_test]
1723 async fn test_room_ordering() {
1724 let room_id = room_id!("!galette:saucisse.bzh");
1725
1726 let client = MockClientBuilder::new(None).build().await;
1727
1728 let f = EventFactory::new().room(room_id).sender(*ALICE);
1729
1730 let evid1 = event_id!("$1");
1731 let evid2 = event_id!("$2");
1732 let evid3 = event_id!("$3");
1733
1734 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1735 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1736 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1737
1738 {
1740 client
1741 .event_cache_store()
1742 .lock()
1743 .await
1744 .expect("Could not acquire the event cache lock")
1745 .as_clean()
1746 .expect("Could not acquire a clean event cache lock")
1747 .handle_linked_chunk_updates(
1748 LinkedChunkId::Room(room_id),
1749 vec![
1750 Update::NewItemsChunk {
1751 previous: None,
1752 new: ChunkIdentifier::new(0),
1753 next: None,
1754 },
1755 Update::PushItems {
1756 at: Position::new(ChunkIdentifier::new(0), 0),
1757 items: vec![ev1, ev2],
1758 },
1759 Update::NewItemsChunk {
1760 previous: Some(ChunkIdentifier::new(0)),
1761 new: ChunkIdentifier::new(1),
1762 next: None,
1763 },
1764 Update::PushItems {
1765 at: Position::new(ChunkIdentifier::new(1), 0),
1766 items: vec![ev3.clone()],
1767 },
1768 ],
1769 )
1770 .await
1771 .unwrap();
1772 }
1773
1774 let event_cache = client.event_cache();
1775 event_cache.subscribe().unwrap();
1776
1777 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1778 let room = client.get_room(room_id).unwrap();
1779 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1780
1781 {
1784 let state = room_event_cache.inner.state.read().await.unwrap();
1785 let room_linked_chunk = state.room_linked_chunk();
1786
1787 assert_eq!(
1789 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1790 Some(0)
1791 );
1792
1793 assert_eq!(
1795 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1796 Some(1)
1797 );
1798
1799 let mut events = room_linked_chunk.events();
1801 let (pos, ev) = events.next().unwrap();
1802 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1803 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1804 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1805
1806 assert!(events.next().is_none());
1808 }
1809
1810 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1812 assert!(outcome.reached_start);
1813
1814 {
1817 let state = room_event_cache.inner.state.read().await.unwrap();
1818 let room_linked_chunk = state.room_linked_chunk();
1819
1820 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1821 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1822 }
1823 }
1824
1825 let evid4 = event_id!("$4");
1830 room_event_cache
1831 .handle_joined_room_update(JoinedRoomUpdate {
1832 timeline: Timeline {
1833 limited: true,
1834 prev_batch: Some("fondue".to_owned()),
1835 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1836 },
1837 ..Default::default()
1838 })
1839 .await
1840 .unwrap();
1841
1842 {
1843 let state = room_event_cache.inner.state.read().await.unwrap();
1844 let room_linked_chunk = state.room_linked_chunk();
1845
1846 let mut events = room_linked_chunk.events();
1848
1849 let (pos, ev) = events.next().unwrap();
1850 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1851 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1852
1853 let (pos, ev) = events.next().unwrap();
1854 assert_eq!(ev.event_id().as_deref(), Some(evid4));
1855 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1856
1857 assert!(events.next().is_none());
1859
1860 assert_eq!(
1862 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1863 Some(0)
1864 );
1865 assert_eq!(
1866 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1867 Some(1)
1868 );
1869
1870 assert_eq!(
1873 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1874 None
1875 );
1876 }
1877 }
1878
1879 #[async_test]
1880 async fn test_auto_shrink_after_all_subscribers_are_gone() {
1881 let room_id = room_id!("!galette:saucisse.bzh");
1882
1883 let client = MockClientBuilder::new(None).build().await;
1884
1885 let f = EventFactory::new().room(room_id);
1886
1887 let evid1 = event_id!("$1");
1888 let evid2 = event_id!("$2");
1889
1890 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1891 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1892
1893 {
1895 client
1896 .event_cache_store()
1897 .lock()
1898 .await
1899 .expect("Could not acquire the event cache lock")
1900 .as_clean()
1901 .expect("Could not acquire a clean event cache lock")
1902 .handle_linked_chunk_updates(
1903 LinkedChunkId::Room(room_id),
1904 vec![
1905 Update::NewItemsChunk {
1906 previous: None,
1907 new: ChunkIdentifier::new(0),
1908 next: None,
1909 },
1910 Update::PushItems {
1911 at: Position::new(ChunkIdentifier::new(0), 0),
1912 items: vec![ev1],
1913 },
1914 Update::NewItemsChunk {
1915 previous: Some(ChunkIdentifier::new(0)),
1916 new: ChunkIdentifier::new(1),
1917 next: None,
1918 },
1919 Update::PushItems {
1920 at: Position::new(ChunkIdentifier::new(1), 0),
1921 items: vec![ev2],
1922 },
1923 ],
1924 )
1925 .await
1926 .unwrap();
1927 }
1928
1929 let event_cache = client.event_cache();
1930 event_cache.subscribe().unwrap();
1931
1932 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1933 let room = client.get_room(room_id).unwrap();
1934 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1935
1936 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1938 assert_eq!(events1.len(), 1);
1939 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1940 assert!(stream1.is_empty());
1941
1942 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1943
1944 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1946 assert_eq!(outcome.events.len(), 1);
1947 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1948 assert!(outcome.reached_start);
1949
1950 assert_let_timeout!(
1953 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1954 stream1.recv()
1955 );
1956 assert_eq!(diffs.len(), 1);
1957 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1958 assert_eq!(value.event_id().as_deref(), Some(evid1));
1959 });
1960
1961 assert!(stream1.is_empty());
1962
1963 assert_let_timeout!(
1964 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1965 );
1966 assert_eq!(expected_room_id, room_id);
1967 assert!(generic_stream.is_empty());
1968
1969 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1973 assert_eq!(events2.len(), 2);
1974 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1975 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1976 assert!(stream2.is_empty());
1977
1978 drop(stream1);
1980 yield_now().await;
1981
1982 assert!(stream2.is_empty());
1984
1985 drop(stream2);
1987 yield_now().await;
1988
1989 {
1992 let state = room_event_cache.inner.state.read().await.unwrap();
1994 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
1995 }
1996
1997 let events3 = room_event_cache.events().await.unwrap();
1999 assert_eq!(events3.len(), 1);
2000 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2001 }
2002
2003 #[async_test]
2004 async fn test_rfind_map_event_in_memory_by() {
2005 let user_id = user_id!("@mnt_io:matrix.org");
2006 let room_id = room_id!("!raclette:patate.ch");
2007 let client = MockClientBuilder::new(None).build().await;
2008
2009 let event_factory = EventFactory::new().room(room_id);
2010
2011 let event_id_0 = event_id!("$ev0");
2012 let event_id_1 = event_id!("$ev1");
2013 let event_id_2 = event_id!("$ev2");
2014 let event_id_3 = event_id!("$ev3");
2015
2016 let event_0 =
2017 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
2018 let event_1 =
2019 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
2020 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
2021 let event_3 =
2022 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
2023
2024 {
2027 client
2028 .event_cache_store()
2029 .lock()
2030 .await
2031 .expect("Could not acquire the event cache lock")
2032 .as_clean()
2033 .expect("Could not acquire a clean event cache lock")
2034 .handle_linked_chunk_updates(
2035 LinkedChunkId::Room(room_id),
2036 vec![
2037 Update::NewItemsChunk {
2038 previous: None,
2039 new: ChunkIdentifier::new(0),
2040 next: None,
2041 },
2042 Update::PushItems {
2043 at: Position::new(ChunkIdentifier::new(0), 0),
2044 items: vec![event_3],
2045 },
2046 Update::NewItemsChunk {
2047 previous: Some(ChunkIdentifier::new(0)),
2048 new: ChunkIdentifier::new(1),
2049 next: None,
2050 },
2051 Update::PushItems {
2052 at: Position::new(ChunkIdentifier::new(1), 0),
2053 items: vec![event_0, event_1, event_2],
2054 },
2055 ],
2056 )
2057 .await
2058 .unwrap();
2059 }
2060
2061 let event_cache = client.event_cache();
2062 event_cache.subscribe().unwrap();
2063
2064 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2065 let room = client.get_room(room_id).unwrap();
2066 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2067
2068 assert_matches!(
2070 room_event_cache
2071 .rfind_map_event_in_memory_by(|event| {
2072 (event.sender().as_deref() == Some(*BOB)).then(|| event.event_id())
2073 })
2074 .await,
2075 Ok(Some(event_id)) => {
2076 assert_eq!(event_id.as_deref(), Some(event_id_0));
2077 }
2078 );
2079
2080 assert_matches!(
2083 room_event_cache
2084 .rfind_map_event_in_memory_by(|event| {
2085 (event.sender().as_deref() == Some(*ALICE)).then(|| event.event_id())
2086 })
2087 .await,
2088 Ok(Some(event_id)) => {
2089 assert_eq!(event_id.as_deref(), Some(event_id_2));
2090 }
2091 );
2092
2093 assert!(
2095 room_event_cache
2096 .rfind_map_event_in_memory_by(|event| {
2097 (event.sender().as_deref() == Some(user_id)).then(|| event.event_id())
2098 })
2099 .await
2100 .unwrap()
2101 .is_none()
2102 );
2103
2104 assert!(
2106 room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2107 );
2108 }
2109
2110 #[async_test]
2111 async fn test_reload_when_dirty() {
2112 let user_id = user_id!("@mnt_io:matrix.org");
2113 let room_id = room_id!("!raclette:patate.ch");
2114
2115 let event_cache_store = MemoryStore::new();
2117
2118 let client_p0 = MockClientBuilder::new(None)
2120 .on_builder(|builder| {
2121 builder.store_config(
2122 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2123 .event_cache_store(event_cache_store.clone()),
2124 )
2125 })
2126 .build()
2127 .await;
2128
2129 let client_p1 = MockClientBuilder::new(None)
2131 .on_builder(|builder| {
2132 builder.store_config(
2133 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2134 .event_cache_store(event_cache_store),
2135 )
2136 })
2137 .build()
2138 .await;
2139
2140 let event_factory = EventFactory::new().room(room_id).sender(user_id);
2141
2142 let ev_id_0 = event_id!("$ev_0");
2143 let ev_id_1 = event_id!("$ev_1");
2144
2145 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2146 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2147
2148 client_p0
2150 .event_cache_store()
2151 .lock()
2152 .await
2153 .expect("[p0] Could not acquire the event cache lock")
2154 .as_clean()
2155 .expect("[p0] Could not acquire a clean event cache lock")
2156 .handle_linked_chunk_updates(
2157 LinkedChunkId::Room(room_id),
2158 vec![
2159 Update::NewItemsChunk {
2160 previous: None,
2161 new: ChunkIdentifier::new(0),
2162 next: None,
2163 },
2164 Update::PushItems {
2165 at: Position::new(ChunkIdentifier::new(0), 0),
2166 items: vec![ev_0],
2167 },
2168 Update::NewItemsChunk {
2169 previous: Some(ChunkIdentifier::new(0)),
2170 new: ChunkIdentifier::new(1),
2171 next: None,
2172 },
2173 Update::PushItems {
2174 at: Position::new(ChunkIdentifier::new(1), 0),
2175 items: vec![ev_1],
2176 },
2177 ],
2178 )
2179 .await
2180 .unwrap();
2181
2182 let (room_event_cache_p0, room_event_cache_p1) = {
2184 let event_cache_p0 = client_p0.event_cache();
2185 event_cache_p0.subscribe().unwrap();
2186
2187 let event_cache_p1 = client_p1.event_cache();
2188 event_cache_p1.subscribe().unwrap();
2189
2190 client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
2191 client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
2192
2193 let (room_event_cache_p0, _drop_handles) =
2194 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2195 let (room_event_cache_p1, _drop_handles) =
2196 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2197
2198 (room_event_cache_p0, room_event_cache_p1)
2199 };
2200
2201 let mut updates_stream_p0 = {
2206 let room_event_cache = &room_event_cache_p0;
2207
2208 let (initial_updates, mut updates_stream) =
2209 room_event_cache_p0.subscribe().await.unwrap();
2210
2211 assert_eq!(initial_updates.len(), 1);
2213 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2214 assert!(updates_stream.is_empty());
2215
2216 assert!(event_loaded(room_event_cache, ev_id_1).await);
2218
2219 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2221
2222 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2224
2225 assert_matches!(
2227 updates_stream.recv().await.unwrap(),
2228 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2229 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2230 assert_matches!(
2231 &diffs[0],
2232 VectorDiff::Insert { index: 0, value: event } => {
2233 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2234 }
2235 );
2236 }
2237 );
2238
2239 assert!(event_loaded(room_event_cache, ev_id_0).await);
2241
2242 updates_stream
2243 };
2244
2245 let mut updates_stream_p1 = {
2247 let room_event_cache = &room_event_cache_p1;
2248 let (initial_updates, mut updates_stream) =
2249 room_event_cache_p1.subscribe().await.unwrap();
2250
2251 assert_eq!(initial_updates.len(), 1);
2253 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2254 assert!(updates_stream.is_empty());
2255
2256 assert!(event_loaded(room_event_cache, ev_id_1).await);
2258
2259 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2261
2262 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2264
2265 assert_matches!(
2267 updates_stream.recv().await.unwrap(),
2268 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2269 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2270 assert_matches!(
2271 &diffs[0],
2272 VectorDiff::Insert { index: 0, value: event } => {
2273 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2274 }
2275 );
2276 }
2277 );
2278
2279 assert!(event_loaded(room_event_cache, ev_id_0).await);
2281
2282 updates_stream
2283 };
2284
2285 for _ in 0..3 {
2287 {
2291 let room_event_cache = &room_event_cache_p0;
2292 let updates_stream = &mut updates_stream_p0;
2293
2294 assert!(event_loaded(room_event_cache, ev_id_1).await);
2296
2297 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2300
2301 assert_matches!(
2303 updates_stream.recv().await.unwrap(),
2304 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2305 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2306 assert_matches!(&diffs[0], VectorDiff::Clear);
2307 assert_matches!(
2308 &diffs[1],
2309 VectorDiff::Append { values: events } => {
2310 assert_eq!(events.len(), 1);
2311 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2312 }
2313 );
2314 }
2315 );
2316
2317 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2319
2320 assert!(event_loaded(room_event_cache, ev_id_0).await);
2322
2323 assert_matches!(
2325 updates_stream.recv().await.unwrap(),
2326 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2327 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2328 assert_matches!(
2329 &diffs[0],
2330 VectorDiff::Insert { index: 0, value: event } => {
2331 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2332 }
2333 );
2334 }
2335 );
2336 }
2337
2338 {
2342 let room_event_cache = &room_event_cache_p1;
2343 let updates_stream = &mut updates_stream_p1;
2344
2345 assert!(event_loaded(room_event_cache, ev_id_1).await);
2347
2348 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2351
2352 assert_matches!(
2354 updates_stream.recv().await.unwrap(),
2355 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2356 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2357 assert_matches!(&diffs[0], VectorDiff::Clear);
2358 assert_matches!(
2359 &diffs[1],
2360 VectorDiff::Append { values: events } => {
2361 assert_eq!(events.len(), 1);
2362 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2363 }
2364 );
2365 }
2366 );
2367
2368 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2370
2371 assert!(event_loaded(room_event_cache, ev_id_0).await);
2373
2374 assert_matches!(
2376 updates_stream.recv().await.unwrap(),
2377 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2378 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2379 assert_matches!(
2380 &diffs[0],
2381 VectorDiff::Insert { index: 0, value: event } => {
2382 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2383 }
2384 );
2385 }
2386 );
2387 }
2388 }
2389
2390 for _ in 0..3 {
2393 {
2394 let room_event_cache = &room_event_cache_p0;
2395 let updates_stream = &mut updates_stream_p0;
2396
2397 let guard = room_event_cache.inner.state.read().await.unwrap();
2398
2399 assert!(guard.is_dirty().not());
2405
2406 assert_matches!(
2408 updates_stream.recv().await.unwrap(),
2409 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2410 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2411 assert_matches!(&diffs[0], VectorDiff::Clear);
2412 assert_matches!(
2413 &diffs[1],
2414 VectorDiff::Append { values: events } => {
2415 assert_eq!(events.len(), 1);
2416 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2417 }
2418 );
2419 }
2420 );
2421
2422 assert!(event_loaded(room_event_cache, ev_id_1).await);
2423 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2424
2425 drop(guard);
2431
2432 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2433 assert!(event_loaded(room_event_cache, ev_id_0).await);
2434
2435 assert_matches!(
2437 updates_stream.recv().await.unwrap(),
2438 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2439 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2440 assert_matches!(
2441 &diffs[0],
2442 VectorDiff::Insert { index: 0, value: event } => {
2443 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2444 }
2445 );
2446 }
2447 );
2448 }
2449
2450 {
2451 let room_event_cache = &room_event_cache_p1;
2452 let updates_stream = &mut updates_stream_p1;
2453
2454 let guard = room_event_cache.inner.state.read().await.unwrap();
2455
2456 assert!(guard.is_dirty().not());
2461
2462 assert_matches!(
2464 updates_stream.recv().await.unwrap(),
2465 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2466 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2467 assert_matches!(&diffs[0], VectorDiff::Clear);
2468 assert_matches!(
2469 &diffs[1],
2470 VectorDiff::Append { values: events } => {
2471 assert_eq!(events.len(), 1);
2472 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2473 }
2474 );
2475 }
2476 );
2477
2478 assert!(event_loaded(room_event_cache, ev_id_1).await);
2479 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2480
2481 drop(guard);
2487
2488 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2489 assert!(event_loaded(room_event_cache, ev_id_0).await);
2490
2491 assert_matches!(
2493 updates_stream.recv().await.unwrap(),
2494 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2495 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2496 assert_matches!(
2497 &diffs[0],
2498 VectorDiff::Insert { index: 0, value: event } => {
2499 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2500 }
2501 );
2502 }
2503 );
2504 }
2505 }
2506
2507 for _ in 0..3 {
2509 {
2510 let room_event_cache = &room_event_cache_p0;
2511 let updates_stream = &mut updates_stream_p0;
2512
2513 let guard = room_event_cache.inner.state.write().await.unwrap();
2514
2515 assert!(guard.is_dirty().not());
2517
2518 assert_matches!(
2520 updates_stream.recv().await.unwrap(),
2521 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2522 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2523 assert_matches!(&diffs[0], VectorDiff::Clear);
2524 assert_matches!(
2525 &diffs[1],
2526 VectorDiff::Append { values: events } => {
2527 assert_eq!(events.len(), 1);
2528 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2529 }
2530 );
2531 }
2532 );
2533
2534 drop(guard);
2537
2538 assert!(event_loaded(room_event_cache, ev_id_1).await);
2539 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2540
2541 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2542 assert!(event_loaded(room_event_cache, ev_id_0).await);
2543
2544 assert_matches!(
2546 updates_stream.recv().await.unwrap(),
2547 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2548 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2549 assert_matches!(
2550 &diffs[0],
2551 VectorDiff::Insert { index: 0, value: event } => {
2552 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2553 }
2554 );
2555 }
2556 );
2557 }
2558
2559 {
2560 let room_event_cache = &room_event_cache_p1;
2561 let updates_stream = &mut updates_stream_p1;
2562
2563 let guard = room_event_cache.inner.state.write().await.unwrap();
2564
2565 assert!(guard.is_dirty().not());
2567
2568 assert_matches!(
2570 updates_stream.recv().await.unwrap(),
2571 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2572 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2573 assert_matches!(&diffs[0], VectorDiff::Clear);
2574 assert_matches!(
2575 &diffs[1],
2576 VectorDiff::Append { values: events } => {
2577 assert_eq!(events.len(), 1);
2578 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2579 }
2580 );
2581 }
2582 );
2583
2584 drop(guard);
2587
2588 assert!(event_loaded(room_event_cache, ev_id_1).await);
2589 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2590
2591 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2592 assert!(event_loaded(room_event_cache, ev_id_0).await);
2593
2594 assert_matches!(
2596 updates_stream.recv().await.unwrap(),
2597 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2598 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2599 assert_matches!(
2600 &diffs[0],
2601 VectorDiff::Insert { index: 0, value: event } => {
2602 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2603 }
2604 );
2605 }
2606 );
2607 }
2608 }
2609 }
2610
2611 #[async_test]
2612 async fn test_load_when_dirty() {
2613 let room_id_0 = room_id!("!raclette:patate.ch");
2614 let room_id_1 = room_id!("!morbiflette:patate.ch");
2615
2616 let event_cache_store = MemoryStore::new();
2618
2619 let client_p0 = MockClientBuilder::new(None)
2621 .on_builder(|builder| {
2622 builder.store_config(
2623 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2624 .event_cache_store(event_cache_store.clone()),
2625 )
2626 })
2627 .build()
2628 .await;
2629
2630 let client_p1 = MockClientBuilder::new(None)
2632 .on_builder(|builder| {
2633 builder.store_config(
2634 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2635 .event_cache_store(event_cache_store),
2636 )
2637 })
2638 .build()
2639 .await;
2640
2641 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2643 let event_cache_p0 = client_p0.event_cache();
2644 event_cache_p0.subscribe().unwrap();
2645
2646 let event_cache_p1 = client_p1.event_cache();
2647 event_cache_p1.subscribe().unwrap();
2648
2649 client_p0.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2650 client_p0.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2651
2652 client_p1.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2653 client_p1.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2654
2655 let (room_event_cache_0_p0, _drop_handles) =
2656 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2657 let (room_event_cache_0_p1, _drop_handles) =
2658 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2659
2660 (room_event_cache_0_p0, room_event_cache_0_p1)
2661 };
2662
2663 {
2665 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2666 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2667 }
2668
2669 let (room_event_cache_1_p0, _) =
2673 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2674
2675 {
2677 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2678 assert!(guard.is_dirty().not());
2679 }
2680
2681 }
2684
2685 #[async_test]
2686 async fn test_uniq_read_marker() {
2687 let client = MockClientBuilder::new(None).build().await;
2688 let room_id = room_id!("!galette:saucisse.bzh");
2689 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2690
2691 let event_cache = client.event_cache();
2692
2693 event_cache.subscribe().unwrap();
2694
2695 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2696 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
2697 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
2698
2699 assert!(events.is_empty());
2700
2701 let read_marker_event = Raw::from_json_string(
2703 json!({
2704 "content": {
2705 "event_id": "$crepe:saucisse.bzh"
2706 },
2707 "room_id": "!galette:saucisse.bzh",
2708 "type": "m.fully_read"
2709 })
2710 .to_string(),
2711 )
2712 .unwrap();
2713 let account_data = vec![read_marker_event; 100];
2714
2715 room_event_cache
2716 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
2717 .await
2718 .unwrap();
2719
2720 assert_matches!(
2722 stream.recv().await.unwrap(),
2723 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
2724 );
2725
2726 assert!(stream.recv().now_or_never().is_none());
2727
2728 assert!(generic_stream.recv().now_or_never().is_none());
2730 }
2731
2732 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2733 room_event_cache
2734 .rfind_map_event_in_memory_by(|event| {
2735 (event.event_id().as_deref() == Some(event_id)).then_some(())
2736 })
2737 .await
2738 .unwrap()
2739 .is_some()
2740 }
2741}