1pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21 collections::BTreeMap,
22 fmt,
23 sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28 deserialized_responses::AmbiguityChange,
29 event_cache::Event,
30 linked_chunk::Position,
31 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
32};
33use ruma::{
34 EventId, OwnedEventId, OwnedRoomId, RoomId,
35 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
36 serde::Raw,
37};
38pub(in super::super) use state::RoomEventCacheStateLock;
39pub use subscriber::RoomEventCacheSubscriber;
40use tokio::sync::{Notify, broadcast::Receiver, mpsc};
41use tracing::{instrument, trace, warn};
42pub use updates::{
43 RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
44 RoomEventCacheUpdateSender,
45};
46
47use super::{
48 super::{
49 AutoShrinkChannelPayload, EventCacheError, EventsOrigin, PaginationStatus, Result,
50 RoomPagination,
51 },
52 TimelineVectorDiffs,
53 event_linked_chunk::sort_positions_descending,
54 thread::pagination::ThreadPagination,
55};
56use crate::{client::WeakClient, room::WeakRoom};
57
58#[derive(Clone)]
62pub struct RoomEventCache {
63 inner: Arc<RoomEventCacheInner>,
64}
65
66impl fmt::Debug for RoomEventCache {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 f.debug_struct("RoomEventCache").finish_non_exhaustive()
69 }
70}
71
72impl RoomEventCache {
73 pub(in super::super) fn new(
75 client: WeakClient,
76 state: RoomEventCacheStateLock,
77 pagination_status: SharedObservable<PaginationStatus>,
78 room_id: OwnedRoomId,
79 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
80 update_sender: RoomEventCacheUpdateSender,
81 ) -> Self {
82 Self {
83 inner: Arc::new(RoomEventCacheInner::new(
84 client,
85 state,
86 pagination_status,
87 room_id,
88 auto_shrink_sender,
89 update_sender,
90 )),
91 }
92 }
93
94 pub fn room_id(&self) -> &RoomId {
96 &self.inner.room_id
97 }
98
99 pub async fn events(&self) -> Result<Vec<Event>> {
104 let state = self.inner.state.read().await?;
105
106 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
107 }
108
109 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
116 let state = self.inner.state.read().await?;
117 let events =
118 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
119
120 let subscriber_count = state.subscriber_count();
121 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
122 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
123
124 let subscriber = RoomEventCacheSubscriber::new(
125 self.inner.update_sender.new_room_receiver(),
126 self.inner.room_id.clone(),
127 self.inner.auto_shrink_sender.clone(),
128 subscriber_count.clone(),
129 );
130
131 Ok((events, subscriber))
132 }
133
134 pub async fn subscribe_to_thread(
137 &self,
138 thread_root: OwnedEventId,
139 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
140 let mut state = self.inner.state.write().await?;
141 Ok(state.subscribe_to_thread(thread_root))
142 }
143
144 pub async fn subscribe_to_pinned_events(
154 &self,
155 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
156 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
157 let state = self.inner.state.read().await?;
158
159 state.subscribe_to_pinned_events(room).await
160 }
161
162 pub fn pagination(&self) -> RoomPagination {
165 RoomPagination::new(self.inner.clone())
166 }
167
168 pub fn thread_pagination(&self, thread_id: OwnedEventId) -> ThreadPagination {
171 ThreadPagination::new(self.inner.clone(), thread_id)
172 }
173
174 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
182 where
183 P: FnMut(&Event) -> Option<O>,
184 {
185 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
186 }
187
188 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
193 Ok(self
194 .inner
195 .state
196 .read()
197 .await?
198 .find_event(event_id)
199 .await
200 .ok()
201 .flatten()
202 .map(|(_loc, event)| event))
203 }
204
205 pub async fn find_event_with_relations(
217 &self,
218 event_id: &EventId,
219 filter: Option<Vec<RelationType>>,
220 ) -> Result<Option<(Event, Vec<Event>)>> {
221 Ok(self
223 .inner
224 .state
225 .read()
226 .await?
227 .find_event_with_relations(event_id, filter.clone())
228 .await
229 .ok()
230 .flatten())
231 }
232
233 pub async fn find_event_relations(
245 &self,
246 event_id: &EventId,
247 filter: Option<Vec<RelationType>>,
248 ) -> Result<Vec<Event>> {
249 self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
251 }
252
253 pub async fn clear(&self) -> Result<()> {
258 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
260
261 self.inner.update_sender.send(
263 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
264 diffs: updates_as_vector_diffs,
265 origin: EventsOrigin::Cache,
266 }),
267 Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
268 );
269
270 Ok(())
271 }
272
273 pub(in super::super) fn state(&self) -> &RoomEventCacheStateLock {
275 &self.inner.state
276 }
277
278 #[instrument(skip_all, fields(room_id = %self.room_id()))]
280 pub(in super::super) async fn handle_joined_room_update(
281 &self,
282 updates: JoinedRoomUpdate,
283 ) -> Result<()> {
284 self.inner
285 .handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes)
286 .await?;
287 self.inner.handle_account_data(updates.account_data);
288
289 Ok(())
290 }
291
292 #[instrument(skip_all, fields(room_id = %self.room_id()))]
294 pub(in super::super) async fn handle_left_room_update(
295 &self,
296 updates: LeftRoomUpdate,
297 ) -> Result<()> {
298 self.inner.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
299
300 Ok(())
301 }
302
303 pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
305 &self.inner.update_sender
306 }
307
308 pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
310 self.inner
311 .handle_timeline(
312 Timeline { limited: false, prev_batch: None, events: vec![event] },
313 Vec::new(),
314 BTreeMap::new(),
315 )
316 .await
317 }
318
319 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
322 match self.inner.state.write().await {
323 Ok(mut state_guard) => {
324 if let Err(err) = state_guard.save_events(events).await {
325 warn!("couldn't save event in the event cache: {err}");
326 }
327 }
328
329 Err(err) => {
330 warn!("couldn't save event in the event cache: {err}");
331 }
332 }
333 }
334
335 pub async fn debug_string(&self) -> Vec<String> {
338 match self.inner.state.read().await {
339 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
340 Err(err) => {
341 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
342
343 vec![]
344 }
345 }
346 }
347}
348
349pub(in super::super) struct RoomEventCacheInner {
351 room_id: OwnedRoomId,
353
354 pub weak_room: WeakRoom,
355
356 pub state: RoomEventCacheStateLock,
358
359 pub pagination_batch_token_notifier: Notify,
361
362 pub pagination_status: SharedObservable<PaginationStatus>,
363
364 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
369
370 update_sender: RoomEventCacheUpdateSender,
372}
373
374impl RoomEventCacheInner {
375 fn new(
378 client: WeakClient,
379 state: RoomEventCacheStateLock,
380 pagination_status: SharedObservable<PaginationStatus>,
381 room_id: OwnedRoomId,
382 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
383 update_sender: RoomEventCacheUpdateSender,
384 ) -> Self {
385 let weak_room = WeakRoom::new(client, room_id);
386
387 Self {
388 room_id: weak_room.room_id().to_owned(),
389 weak_room,
390 state,
391 update_sender,
392 pagination_batch_token_notifier: Default::default(),
393 auto_shrink_sender,
394 pagination_status,
395 }
396 }
397
398 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
399 if account_data.is_empty() {
400 return;
401 }
402
403 let mut handled_read_marker = false;
404
405 trace!("Handling account data");
406
407 for raw_event in account_data {
408 match raw_event.deserialize() {
409 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
410 if handled_read_marker {
413 continue;
414 }
415
416 handled_read_marker = true;
417
418 self.update_sender.send(
420 RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
421 None,
422 );
423 }
424
425 Ok(_) => {
426 }
429
430 Err(e) => {
431 let event_type = raw_event.get_field::<String>("type").ok().flatten();
432 warn!(event_type, "Failed to deserialize account data: {e}");
433 }
434 }
435 }
436 }
437
438 async fn handle_timeline(
441 &self,
442 timeline: Timeline,
443 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
444 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
445 ) -> Result<()> {
446 if timeline.events.is_empty()
447 && timeline.prev_batch.is_none()
448 && ephemeral_events.is_empty()
449 && ambiguity_changes.is_empty()
450 {
451 return Ok(());
452 }
453
454 trace!("adding new events");
456
457 let (stored_prev_batch_token, timeline_event_diffs) =
458 self.state.write().await?.handle_sync(timeline).await?;
459
460 if stored_prev_batch_token {
463 self.pagination_batch_token_notifier.notify_one();
464 }
465
466 if !timeline_event_diffs.is_empty() {
469 self.update_sender.send(
470 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
471 diffs: timeline_event_diffs,
472 origin: EventsOrigin::Sync,
473 }),
474 Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
475 );
476 }
477
478 if !ephemeral_events.is_empty() {
479 self.update_sender
480 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
481 }
482
483 if !ambiguity_changes.is_empty() {
484 self.update_sender
485 .send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }, None);
486 }
487
488 Ok(())
489 }
490}
491
492#[derive(Clone, Copy)]
493pub(in super::super) enum PostProcessingOrigin {
494 Sync,
495 Backpagination,
496 #[cfg(feature = "e2e-encryption")]
497 Redecryption,
498}
499
500pub(in super::super) enum EventLocation {
502 Memory(Position),
504
505 Store,
507}
508
509#[cfg(test)]
510mod tests {
511 use matrix_sdk_base::event_cache::Event;
512 use matrix_sdk_test::{async_test, event_factory::EventFactory};
513 use ruma::{
514 RoomId, event_id,
515 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
516 room_id, user_id,
517 };
518
519 use crate::test_utils::logged_in_client;
520
521 #[async_test]
522 async fn test_find_event_by_id_with_edit_relation() {
523 let original_id = event_id!("$original");
524 let related_id = event_id!("$related");
525 let room_id = room_id!("!galette:saucisse.bzh");
526 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
527
528 assert_relations(
529 room_id,
530 f.text_msg("Original event").event_id(original_id).into(),
531 f.text_msg("* An edited event")
532 .edit(
533 original_id,
534 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
535 )
536 .event_id(related_id)
537 .into(),
538 f,
539 )
540 .await;
541 }
542
543 #[async_test]
544 async fn test_find_event_by_id_with_thread_reply_relation() {
545 let original_id = event_id!("$original");
546 let related_id = event_id!("$related");
547 let room_id = room_id!("!galette:saucisse.bzh");
548 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
549
550 assert_relations(
551 room_id,
552 f.text_msg("Original event").event_id(original_id).into(),
553 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
554 f,
555 )
556 .await;
557 }
558
559 #[async_test]
560 async fn test_find_event_by_id_with_reaction_relation() {
561 let original_id = event_id!("$original");
562 let related_id = event_id!("$related");
563 let room_id = room_id!("!galette:saucisse.bzh");
564 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
565
566 assert_relations(
567 room_id,
568 f.text_msg("Original event").event_id(original_id).into(),
569 f.reaction(original_id, ":D").event_id(related_id).into(),
570 f,
571 )
572 .await;
573 }
574
575 #[async_test]
576 async fn test_find_event_by_id_with_poll_response_relation() {
577 let original_id = event_id!("$original");
578 let related_id = event_id!("$related");
579 let room_id = room_id!("!galette:saucisse.bzh");
580 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
581
582 assert_relations(
583 room_id,
584 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
585 .event_id(original_id)
586 .into(),
587 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
588 f,
589 )
590 .await;
591 }
592
593 #[async_test]
594 async fn test_find_event_by_id_with_poll_end_relation() {
595 let original_id = event_id!("$original");
596 let related_id = event_id!("$related");
597 let room_id = room_id!("!galette:saucisse.bzh");
598 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
599
600 assert_relations(
601 room_id,
602 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
603 .event_id(original_id)
604 .into(),
605 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
606 f,
607 )
608 .await;
609 }
610
611 #[async_test]
612 async fn test_find_event_by_id_with_filtered_relationships() {
613 let original_id = event_id!("$original");
614 let related_id = event_id!("$related");
615 let associated_related_id = event_id!("$recursive_related");
616 let room_id = room_id!("!galette:saucisse.bzh");
617 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
618
619 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
620 let related_event = event_factory
621 .text_msg("* Edited event")
622 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
623 .event_id(related_id)
624 .into();
625 let associated_related_event =
626 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
627
628 let client = logged_in_client(None).await;
629
630 let event_cache = client.event_cache();
631 event_cache.subscribe().unwrap();
632
633 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
634 let room = client.get_room(room_id).unwrap();
635
636 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
637
638 room_event_cache.save_events([original_event]).await;
640
641 room_event_cache.save_events([related_event]).await;
643
644 room_event_cache.save_events([associated_related_event]).await;
646
647 let filter = Some(vec![RelationType::Replacement]);
648 let (event, related_events) = room_event_cache
649 .find_event_with_relations(original_id, filter)
650 .await
651 .expect("Failed to find the event with relations")
652 .expect("Event has no relation");
653 let cached_event_id = event.event_id().unwrap();
655 assert_eq!(cached_event_id, original_id);
656
657 assert_eq!(related_events.len(), 1);
659
660 let related_event_id = related_events[0].event_id().unwrap();
661 assert_eq!(related_event_id, related_id);
662
663 let filter = Some(vec![RelationType::Thread]);
665 let (event, related_events) = room_event_cache
666 .find_event_with_relations(original_id, filter)
667 .await
668 .expect("Failed to find the event with relations")
669 .expect("Event has no relation");
670
671 let cached_event_id = event.event_id().unwrap();
673 assert_eq!(cached_event_id, original_id);
674 assert!(related_events.is_empty());
676 }
677
678 #[async_test]
679 async fn test_find_event_by_id_with_recursive_relation() {
680 let original_id = event_id!("$original");
681 let related_id = event_id!("$related");
682 let associated_related_id = event_id!("$recursive_related");
683 let room_id = room_id!("!galette:saucisse.bzh");
684 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
685
686 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
687 let related_event = event_factory
688 .text_msg("* Edited event")
689 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
690 .event_id(related_id)
691 .into();
692 let associated_related_event =
693 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
694
695 let client = logged_in_client(None).await;
696
697 let event_cache = client.event_cache();
698 event_cache.subscribe().unwrap();
699
700 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
701 let room = client.get_room(room_id).unwrap();
702
703 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
704
705 room_event_cache.save_events([original_event]).await;
707
708 room_event_cache.save_events([related_event]).await;
710
711 room_event_cache.save_events([associated_related_event]).await;
713
714 let (event, related_events) = room_event_cache
715 .find_event_with_relations(original_id, None)
716 .await
717 .expect("Failed to find the event with relations")
718 .expect("Event has no relation");
719 let cached_event_id = event.event_id().unwrap();
721 assert_eq!(cached_event_id, original_id);
722
723 assert_eq!(related_events.len(), 2);
725
726 let related_event_id = related_events[0].event_id().unwrap();
727 assert_eq!(related_event_id, related_id);
728 let related_event_id = related_events[1].event_id().unwrap();
729 assert_eq!(related_event_id, associated_related_id);
730 }
731
732 async fn assert_relations(
733 room_id: &RoomId,
734 original_event: Event,
735 related_event: Event,
736 event_factory: EventFactory,
737 ) {
738 let client = logged_in_client(None).await;
739
740 let event_cache = client.event_cache();
741 event_cache.subscribe().unwrap();
742
743 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
744 let room = client.get_room(room_id).unwrap();
745
746 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
747
748 let original_event_id = original_event.event_id().unwrap();
750 room_event_cache.save_events([original_event]).await;
751
752 let unrelated_id = event_id!("$2");
754 room_event_cache
755 .save_events([event_factory
756 .text_msg("An unrelated event")
757 .event_id(unrelated_id)
758 .into()])
759 .await;
760
761 let related_id = related_event.event_id().unwrap();
763 room_event_cache.save_events([related_event]).await;
764
765 let (event, related_events) = room_event_cache
766 .find_event_with_relations(&original_event_id, None)
767 .await
768 .expect("Failed to find the event with relations")
769 .expect("Event has no relation");
770 let cached_event_id = event.event_id().unwrap();
772 assert_eq!(cached_event_id, original_event_id);
773
774 let related_event_id = related_events[0].event_id().unwrap();
776 assert_eq!(related_event_id, related_id);
777 }
778}
779
780#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
782 use std::{ops::Not, sync::Arc};
783
784 use assert_matches::assert_matches;
785 use assert_matches2::assert_let;
786 use eyeball_im::VectorDiff;
787 use futures_util::FutureExt;
788 use matrix_sdk_base::{
789 event_cache::{
790 Gap,
791 store::{EventCacheStore as _, MemoryStore},
792 },
793 linked_chunk::{
794 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
795 lazy_loader::from_all_chunks,
796 },
797 store::StoreConfig,
798 sync::{JoinedRoomUpdate, Timeline},
799 };
800 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
801 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
802 use ruma::{
803 EventId, OwnedUserId, event_id,
804 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
805 room_id, user_id,
806 };
807 use tokio::task::yield_now;
808
809 use super::{
810 super::{lock::Reload as _, pagination::LoadMoreEventsBackwardsOutcome},
811 RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
812 };
813 use crate::{
814 assert_let_timeout, event_cache::TimelineVectorDiffs, test_utils::client::MockClientBuilder,
815 };
816
817 #[async_test]
818 async fn test_write_to_storage() {
819 let room_id = room_id!("!galette:saucisse.bzh");
820 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
821
822 let event_cache_store = Arc::new(MemoryStore::new());
823
824 let client = MockClientBuilder::new(None)
825 .on_builder(|builder| {
826 builder.store_config(
827 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
828 .event_cache_store(event_cache_store.clone()),
829 )
830 })
831 .build()
832 .await;
833
834 let event_cache = client.event_cache();
835
836 event_cache.subscribe().unwrap();
838
839 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
840 let room = client.get_room(room_id).unwrap();
841
842 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
843 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
844
845 let timeline = Timeline {
847 limited: true,
848 prev_batch: Some("raclette".to_owned()),
849 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
850 };
851
852 room_event_cache
853 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
854 .await
855 .unwrap();
856
857 assert_matches!(
859 generic_stream.recv().await,
860 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
861 assert_eq!(expected_room_id, room_id);
862 }
863 );
864 assert!(generic_stream.is_empty());
865
866 let linked_chunk = from_all_chunks::<3, _, _>(
868 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
869 )
870 .unwrap()
871 .unwrap();
872
873 assert_eq!(linked_chunk.chunks().count(), 2);
874
875 let mut chunks = linked_chunk.chunks();
876
877 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
879 assert_eq!(gap.prev_token, "raclette");
880 });
881
882 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
884 assert_eq!(events.len(), 1);
885 let deserialized = events[0].raw().deserialize().unwrap();
886 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
887 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
888 });
889
890 assert!(chunks.next().is_none());
892 }
893
894 #[async_test]
895 async fn test_write_to_storage_strips_bundled_relations() {
896 let room_id = room_id!("!galette:saucisse.bzh");
897 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
898
899 let event_cache_store = Arc::new(MemoryStore::new());
900
901 let client = MockClientBuilder::new(None)
902 .on_builder(|builder| {
903 builder.store_config(
904 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
905 .event_cache_store(event_cache_store.clone()),
906 )
907 })
908 .build()
909 .await;
910
911 let event_cache = client.event_cache();
912
913 event_cache.subscribe().unwrap();
915
916 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
917 let room = client.get_room(room_id).unwrap();
918
919 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
920 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
921
922 let ev = f
924 .text_msg("hey yo")
925 .sender(*ALICE)
926 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
927 .into_event();
928
929 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
930
931 room_event_cache
932 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
933 .await
934 .unwrap();
935
936 assert_matches!(
938 generic_stream.recv().await,
939 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
940 assert_eq!(expected_room_id, room_id);
941 }
942 );
943 assert!(generic_stream.is_empty());
944
945 {
947 let events = room_event_cache.events().await.unwrap();
948
949 assert_eq!(events.len(), 1);
950
951 let ev = events[0].raw().deserialize().unwrap();
952 assert_let!(
953 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
954 );
955
956 let original = msg.as_original().unwrap();
957 assert_eq!(original.content.body(), "hey yo");
958 assert!(original.unsigned.relations.replace.is_some());
959 }
960
961 let linked_chunk = from_all_chunks::<3, _, _>(
963 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
964 )
965 .unwrap()
966 .unwrap();
967
968 assert_eq!(linked_chunk.chunks().count(), 1);
969
970 let mut chunks = linked_chunk.chunks();
971 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
972 assert_eq!(events.len(), 1);
973
974 let ev = events[0].raw().deserialize().unwrap();
975 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
976
977 let original = msg.as_original().unwrap();
978 assert_eq!(original.content.body(), "hey yo");
979 assert!(original.unsigned.relations.replace.is_none());
980 });
981
982 assert!(chunks.next().is_none());
984 }
985
986 #[async_test]
987 async fn test_clear() {
988 let room_id = room_id!("!galette:saucisse.bzh");
989 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
990
991 let event_cache_store = Arc::new(MemoryStore::new());
992
993 let event_id1 = event_id!("$1");
994 let event_id2 = event_id!("$2");
995
996 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
997 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
998
999 event_cache_store
1001 .handle_linked_chunk_updates(
1002 LinkedChunkId::Room(room_id),
1003 vec![
1004 Update::NewItemsChunk {
1006 previous: None,
1007 new: ChunkIdentifier::new(0),
1008 next: None,
1009 },
1010 Update::NewGapChunk {
1012 previous: Some(ChunkIdentifier::new(0)),
1013 new: ChunkIdentifier::new(42),
1015 next: None,
1016 gap: Gap { prev_token: "comté".to_owned() },
1017 },
1018 Update::NewItemsChunk {
1020 previous: Some(ChunkIdentifier::new(42)),
1021 new: ChunkIdentifier::new(1),
1022 next: None,
1023 },
1024 Update::PushItems {
1025 at: Position::new(ChunkIdentifier::new(1), 0),
1026 items: vec![ev1.clone()],
1027 },
1028 Update::NewItemsChunk {
1030 previous: Some(ChunkIdentifier::new(1)),
1031 new: ChunkIdentifier::new(2),
1032 next: None,
1033 },
1034 Update::PushItems {
1035 at: Position::new(ChunkIdentifier::new(2), 0),
1036 items: vec![ev2.clone()],
1037 },
1038 ],
1039 )
1040 .await
1041 .unwrap();
1042
1043 let client = MockClientBuilder::new(None)
1044 .on_builder(|builder| {
1045 builder.store_config(
1046 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1047 .event_cache_store(event_cache_store.clone()),
1048 )
1049 })
1050 .build()
1051 .await;
1052
1053 let event_cache = client.event_cache();
1054
1055 event_cache.subscribe().unwrap();
1057
1058 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1059 let room = client.get_room(room_id).unwrap();
1060
1061 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1062
1063 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1064 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1065
1066 {
1068 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1069 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1070 }
1071
1072 {
1074 assert_eq!(items.len(), 1);
1076 assert_eq!(items[0].event_id().unwrap(), event_id2);
1077
1078 assert!(stream.is_empty());
1079 }
1080
1081 {
1083 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1084
1085 assert_let_timeout!(
1086 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1087 stream.recv()
1088 );
1089 assert_eq!(diffs.len(), 1);
1090 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1091 assert_eq!(event.event_id().unwrap(), event_id1);
1093 });
1094
1095 assert!(stream.is_empty());
1096
1097 assert_let_timeout!(
1098 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1099 generic_stream.recv()
1100 );
1101 assert_eq!(room_id, expected_room_id);
1102 assert!(generic_stream.is_empty());
1103 }
1104
1105 room_event_cache.clear().await.unwrap();
1107
1108 assert_let_timeout!(
1110 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1111 stream.recv()
1112 );
1113 assert_eq!(diffs.len(), 1);
1114 assert_let!(VectorDiff::Clear = &diffs[0]);
1115
1116 assert_let_timeout!(
1118 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1119 );
1120 assert_eq!(received_room_id, room_id);
1121 assert!(generic_stream.is_empty());
1122
1123 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1126
1127 let items = room_event_cache.events().await.unwrap();
1129 assert!(items.is_empty());
1130
1131 let linked_chunk = from_all_chunks::<3, _, _>(
1133 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1134 )
1135 .unwrap()
1136 .unwrap();
1137
1138 assert_eq!(linked_chunk.num_items(), 0);
1142 }
1143
1144 #[async_test]
1145 async fn test_load_from_storage() {
1146 let room_id = room_id!("!galette:saucisse.bzh");
1147 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1148
1149 let event_cache_store = Arc::new(MemoryStore::new());
1150
1151 let event_id1 = event_id!("$1");
1152 let event_id2 = event_id!("$2");
1153
1154 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1155 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1156
1157 event_cache_store
1159 .handle_linked_chunk_updates(
1160 LinkedChunkId::Room(room_id),
1161 vec![
1162 Update::NewItemsChunk {
1164 previous: None,
1165 new: ChunkIdentifier::new(0),
1166 next: None,
1167 },
1168 Update::NewGapChunk {
1170 previous: Some(ChunkIdentifier::new(0)),
1171 new: ChunkIdentifier::new(42),
1173 next: None,
1174 gap: Gap { prev_token: "cheddar".to_owned() },
1175 },
1176 Update::NewItemsChunk {
1178 previous: Some(ChunkIdentifier::new(42)),
1179 new: ChunkIdentifier::new(1),
1180 next: None,
1181 },
1182 Update::PushItems {
1183 at: Position::new(ChunkIdentifier::new(1), 0),
1184 items: vec![ev1.clone()],
1185 },
1186 Update::NewItemsChunk {
1188 previous: Some(ChunkIdentifier::new(1)),
1189 new: ChunkIdentifier::new(2),
1190 next: None,
1191 },
1192 Update::PushItems {
1193 at: Position::new(ChunkIdentifier::new(2), 0),
1194 items: vec![ev2.clone()],
1195 },
1196 ],
1197 )
1198 .await
1199 .unwrap();
1200
1201 let client = MockClientBuilder::new(None)
1202 .on_builder(|builder| {
1203 builder.store_config(
1204 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1205 .event_cache_store(event_cache_store.clone()),
1206 )
1207 })
1208 .build()
1209 .await;
1210
1211 let event_cache = client.event_cache();
1212
1213 event_cache.subscribe().unwrap();
1215
1216 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1218
1219 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1220 let room = client.get_room(room_id).unwrap();
1221
1222 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1223
1224 assert_matches!(
1227 generic_stream.recv().await,
1228 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1229 assert_eq!(room_id, expected_room_id);
1230 }
1231 );
1232 assert!(generic_stream.is_empty());
1233
1234 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1235
1236 assert_eq!(items.len(), 1);
1239 assert_eq!(items[0].event_id().unwrap(), event_id2);
1240 assert!(stream.is_empty());
1241
1242 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1244 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1245
1246 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1248
1249 assert_let_timeout!(
1250 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1251 stream.recv()
1252 );
1253 assert_eq!(diffs.len(), 1);
1254 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1255 assert_eq!(event.event_id().unwrap(), event_id1);
1256 });
1257
1258 assert!(stream.is_empty());
1259
1260 assert_matches!(
1262 generic_stream.recv().await,
1263 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1264 assert_eq!(expected_room_id, room_id);
1265 }
1266 );
1267 assert!(generic_stream.is_empty());
1268
1269 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1271
1272 room_event_cache
1273 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1274 .await
1275 .unwrap();
1276
1277 assert!(generic_stream.recv().now_or_never().is_none());
1280
1281 let items = room_event_cache.events().await.unwrap();
1286 assert_eq!(items.len(), 2);
1287 assert_eq!(items[0].event_id().unwrap(), event_id1);
1288 assert_eq!(items[1].event_id().unwrap(), event_id2);
1289 }
1290
1291 #[async_test]
1292 async fn test_load_from_storage_resilient_to_failure() {
1293 let room_id = room_id!("!fondue:patate.ch");
1294 let event_cache_store = Arc::new(MemoryStore::new());
1295
1296 let event = EventFactory::new()
1297 .room(room_id)
1298 .sender(user_id!("@ben:saucisse.bzh"))
1299 .text_msg("foo")
1300 .event_id(event_id!("$42"))
1301 .into_event();
1302
1303 event_cache_store
1305 .handle_linked_chunk_updates(
1306 LinkedChunkId::Room(room_id),
1307 vec![
1308 Update::NewItemsChunk {
1309 previous: None,
1310 new: ChunkIdentifier::new(0),
1311 next: None,
1312 },
1313 Update::PushItems {
1314 at: Position::new(ChunkIdentifier::new(0), 0),
1315 items: vec![event],
1316 },
1317 Update::NewItemsChunk {
1318 previous: Some(ChunkIdentifier::new(0)),
1319 new: ChunkIdentifier::new(1),
1320 next: Some(ChunkIdentifier::new(0)),
1321 },
1322 ],
1323 )
1324 .await
1325 .unwrap();
1326
1327 let client = MockClientBuilder::new(None)
1328 .on_builder(|builder| {
1329 builder.store_config(
1330 StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1331 .event_cache_store(event_cache_store.clone()),
1332 )
1333 })
1334 .build()
1335 .await;
1336
1337 let event_cache = client.event_cache();
1338
1339 event_cache.subscribe().unwrap();
1341
1342 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1343 let room = client.get_room(room_id).unwrap();
1344
1345 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1346
1347 let items = room_event_cache.events().await.unwrap();
1348
1349 assert!(items.is_empty());
1352
1353 let raw_chunks =
1356 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1357 assert!(raw_chunks.is_empty());
1358 }
1359
1360 #[async_test]
1361 async fn test_no_useless_gaps() {
1362 let room_id = room_id!("!galette:saucisse.bzh");
1363
1364 let client = MockClientBuilder::new(None).build().await;
1365
1366 let event_cache = client.event_cache();
1367 event_cache.subscribe().unwrap();
1368
1369 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1370 let room = client.get_room(room_id).unwrap();
1371 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1372 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1373
1374 let f = EventFactory::new().room(room_id).sender(*ALICE);
1375
1376 room_event_cache
1379 .handle_joined_room_update(JoinedRoomUpdate {
1380 timeline: Timeline {
1381 limited: true,
1382 prev_batch: Some("raclette".to_owned()),
1383 events: vec![f.text_msg("hey yo").into_event()],
1384 },
1385 ..Default::default()
1386 })
1387 .await
1388 .unwrap();
1389
1390 assert_matches!(
1392 generic_stream.recv().await,
1393 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1394 assert_eq!(expected_room_id, room_id);
1395 }
1396 );
1397 assert!(generic_stream.is_empty());
1398
1399 {
1400 let mut state = room_event_cache.inner.state.write().await.unwrap();
1401
1402 let mut num_gaps = 0;
1403 let mut num_events = 0;
1404
1405 for c in state.room_linked_chunk().chunks() {
1406 match c.content() {
1407 ChunkContent::Items(items) => num_events += items.len(),
1408 ChunkContent::Gap(_) => num_gaps += 1,
1409 }
1410 }
1411
1412 assert_eq!(num_gaps, 0);
1415 assert_eq!(num_events, 1);
1416
1417 assert_matches!(
1419 state.load_more_events_backwards().await.unwrap(),
1420 LoadMoreEventsBackwardsOutcome::Gap { .. }
1421 );
1422
1423 num_gaps = 0;
1424 num_events = 0;
1425 for c in state.room_linked_chunk().chunks() {
1426 match c.content() {
1427 ChunkContent::Items(items) => num_events += items.len(),
1428 ChunkContent::Gap(_) => num_gaps += 1,
1429 }
1430 }
1431
1432 assert_eq!(num_gaps, 1);
1434 assert_eq!(num_events, 1);
1435 }
1436
1437 room_event_cache
1440 .handle_joined_room_update(JoinedRoomUpdate {
1441 timeline: Timeline {
1442 limited: false,
1443 prev_batch: Some("fondue".to_owned()),
1444 events: vec![f.text_msg("sup").into_event()],
1445 },
1446 ..Default::default()
1447 })
1448 .await
1449 .unwrap();
1450
1451 assert_matches!(
1453 generic_stream.recv().await,
1454 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1455 assert_eq!(expected_room_id, room_id);
1456 }
1457 );
1458 assert!(generic_stream.is_empty());
1459
1460 {
1461 let state = room_event_cache.inner.state.read().await.unwrap();
1462
1463 let mut num_gaps = 0;
1464 let mut num_events = 0;
1465
1466 for c in state.room_linked_chunk().chunks() {
1467 match c.content() {
1468 ChunkContent::Items(items) => num_events += items.len(),
1469 ChunkContent::Gap(gap) => {
1470 assert_eq!(gap.prev_token, "raclette");
1471 num_gaps += 1;
1472 }
1473 }
1474 }
1475
1476 assert_eq!(num_gaps, 1);
1478 assert_eq!(num_events, 2);
1479 }
1480 }
1481
1482 #[async_test]
1483 async fn test_shrink_to_last_chunk() {
1484 let room_id = room_id!("!galette:saucisse.bzh");
1485
1486 let client = MockClientBuilder::new(None).build().await;
1487
1488 let f = EventFactory::new().room(room_id);
1489
1490 let evid1 = event_id!("$1");
1491 let evid2 = event_id!("$2");
1492
1493 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1494 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1495
1496 {
1498 client
1499 .event_cache_store()
1500 .lock()
1501 .await
1502 .expect("Could not acquire the event cache lock")
1503 .as_clean()
1504 .expect("Could not acquire a clean event cache lock")
1505 .handle_linked_chunk_updates(
1506 LinkedChunkId::Room(room_id),
1507 vec![
1508 Update::NewItemsChunk {
1509 previous: None,
1510 new: ChunkIdentifier::new(0),
1511 next: None,
1512 },
1513 Update::PushItems {
1514 at: Position::new(ChunkIdentifier::new(0), 0),
1515 items: vec![ev1],
1516 },
1517 Update::NewItemsChunk {
1518 previous: Some(ChunkIdentifier::new(0)),
1519 new: ChunkIdentifier::new(1),
1520 next: None,
1521 },
1522 Update::PushItems {
1523 at: Position::new(ChunkIdentifier::new(1), 0),
1524 items: vec![ev2],
1525 },
1526 ],
1527 )
1528 .await
1529 .unwrap();
1530 }
1531
1532 let event_cache = client.event_cache();
1533 event_cache.subscribe().unwrap();
1534
1535 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1536 let room = client.get_room(room_id).unwrap();
1537 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1538
1539 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1541 assert_eq!(events.len(), 1);
1542 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1543 assert!(stream.is_empty());
1544
1545 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1546
1547 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1549 assert_eq!(outcome.events.len(), 1);
1550 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1551 assert!(outcome.reached_start);
1552
1553 assert_let_timeout!(
1555 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1556 stream.recv()
1557 );
1558 assert_eq!(diffs.len(), 1);
1559 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1560 assert_eq!(value.event_id().as_deref(), Some(evid1));
1561 });
1562
1563 assert!(stream.is_empty());
1564
1565 assert_let_timeout!(
1567 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1568 );
1569 assert_eq!(expected_room_id, room_id);
1570 assert!(generic_stream.is_empty());
1571
1572 room_event_cache
1574 .inner
1575 .state
1576 .write()
1577 .await
1578 .unwrap()
1579 .reload()
1580 .await
1581 .expect("shrinking should succeed");
1582
1583 assert_let_timeout!(
1585 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1586 stream.recv()
1587 );
1588 assert_eq!(diffs.len(), 2);
1589 assert_matches!(&diffs[0], VectorDiff::Clear);
1590 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1591 assert_eq!(values.len(), 1);
1592 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1593 });
1594
1595 assert!(stream.is_empty());
1596
1597 assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1599 assert!(generic_stream.is_empty());
1600
1601 let events = room_event_cache.events().await.unwrap();
1603 assert_eq!(events.len(), 1);
1604 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1605
1606 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1609 assert_eq!(outcome.events.len(), 1);
1610 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1611 assert!(outcome.reached_start);
1612 }
1613
1614 #[async_test]
1615 async fn test_room_ordering() {
1616 let room_id = room_id!("!galette:saucisse.bzh");
1617
1618 let client = MockClientBuilder::new(None).build().await;
1619
1620 let f = EventFactory::new().room(room_id).sender(*ALICE);
1621
1622 let evid1 = event_id!("$1");
1623 let evid2 = event_id!("$2");
1624 let evid3 = event_id!("$3");
1625
1626 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1627 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1628 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1629
1630 {
1632 client
1633 .event_cache_store()
1634 .lock()
1635 .await
1636 .expect("Could not acquire the event cache lock")
1637 .as_clean()
1638 .expect("Could not acquire a clean event cache lock")
1639 .handle_linked_chunk_updates(
1640 LinkedChunkId::Room(room_id),
1641 vec![
1642 Update::NewItemsChunk {
1643 previous: None,
1644 new: ChunkIdentifier::new(0),
1645 next: None,
1646 },
1647 Update::PushItems {
1648 at: Position::new(ChunkIdentifier::new(0), 0),
1649 items: vec![ev1, ev2],
1650 },
1651 Update::NewItemsChunk {
1652 previous: Some(ChunkIdentifier::new(0)),
1653 new: ChunkIdentifier::new(1),
1654 next: None,
1655 },
1656 Update::PushItems {
1657 at: Position::new(ChunkIdentifier::new(1), 0),
1658 items: vec![ev3.clone()],
1659 },
1660 ],
1661 )
1662 .await
1663 .unwrap();
1664 }
1665
1666 let event_cache = client.event_cache();
1667 event_cache.subscribe().unwrap();
1668
1669 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1670 let room = client.get_room(room_id).unwrap();
1671 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1672
1673 {
1676 let state = room_event_cache.inner.state.read().await.unwrap();
1677 let room_linked_chunk = state.room_linked_chunk();
1678
1679 assert_eq!(
1681 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1682 Some(0)
1683 );
1684
1685 assert_eq!(
1687 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1688 Some(1)
1689 );
1690
1691 let mut events = room_linked_chunk.events();
1693 let (pos, ev) = events.next().unwrap();
1694 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1695 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1696 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1697
1698 assert!(events.next().is_none());
1700 }
1701
1702 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1704 assert!(outcome.reached_start);
1705
1706 {
1709 let state = room_event_cache.inner.state.read().await.unwrap();
1710 let room_linked_chunk = state.room_linked_chunk();
1711
1712 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1713 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1714 }
1715 }
1716
1717 let evid4 = event_id!("$4");
1722 room_event_cache
1723 .handle_joined_room_update(JoinedRoomUpdate {
1724 timeline: Timeline {
1725 limited: true,
1726 prev_batch: Some("fondue".to_owned()),
1727 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1728 },
1729 ..Default::default()
1730 })
1731 .await
1732 .unwrap();
1733
1734 {
1735 let state = room_event_cache.inner.state.read().await.unwrap();
1736 let room_linked_chunk = state.room_linked_chunk();
1737
1738 let mut events = room_linked_chunk.events();
1740
1741 let (pos, ev) = events.next().unwrap();
1742 assert_eq!(ev.event_id().as_deref(), Some(evid3));
1743 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1744
1745 let (pos, ev) = events.next().unwrap();
1746 assert_eq!(ev.event_id().as_deref(), Some(evid4));
1747 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1748
1749 assert!(events.next().is_none());
1751
1752 assert_eq!(
1754 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1755 Some(0)
1756 );
1757 assert_eq!(
1758 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1759 Some(1)
1760 );
1761
1762 assert_eq!(
1765 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1766 None
1767 );
1768 }
1769 }
1770
1771 #[async_test]
1772 async fn test_auto_shrink_after_all_subscribers_are_gone() {
1773 let room_id = room_id!("!galette:saucisse.bzh");
1774
1775 let client = MockClientBuilder::new(None).build().await;
1776
1777 let f = EventFactory::new().room(room_id);
1778
1779 let evid1 = event_id!("$1");
1780 let evid2 = event_id!("$2");
1781
1782 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1783 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1784
1785 {
1787 client
1788 .event_cache_store()
1789 .lock()
1790 .await
1791 .expect("Could not acquire the event cache lock")
1792 .as_clean()
1793 .expect("Could not acquire a clean event cache lock")
1794 .handle_linked_chunk_updates(
1795 LinkedChunkId::Room(room_id),
1796 vec![
1797 Update::NewItemsChunk {
1798 previous: None,
1799 new: ChunkIdentifier::new(0),
1800 next: None,
1801 },
1802 Update::PushItems {
1803 at: Position::new(ChunkIdentifier::new(0), 0),
1804 items: vec![ev1],
1805 },
1806 Update::NewItemsChunk {
1807 previous: Some(ChunkIdentifier::new(0)),
1808 new: ChunkIdentifier::new(1),
1809 next: None,
1810 },
1811 Update::PushItems {
1812 at: Position::new(ChunkIdentifier::new(1), 0),
1813 items: vec![ev2],
1814 },
1815 ],
1816 )
1817 .await
1818 .unwrap();
1819 }
1820
1821 let event_cache = client.event_cache();
1822 event_cache.subscribe().unwrap();
1823
1824 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1825 let room = client.get_room(room_id).unwrap();
1826 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1827
1828 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1830 assert_eq!(events1.len(), 1);
1831 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1832 assert!(stream1.is_empty());
1833
1834 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1835
1836 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1838 assert_eq!(outcome.events.len(), 1);
1839 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1840 assert!(outcome.reached_start);
1841
1842 assert_let_timeout!(
1845 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1846 stream1.recv()
1847 );
1848 assert_eq!(diffs.len(), 1);
1849 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1850 assert_eq!(value.event_id().as_deref(), Some(evid1));
1851 });
1852
1853 assert!(stream1.is_empty());
1854
1855 assert_let_timeout!(
1856 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1857 );
1858 assert_eq!(expected_room_id, room_id);
1859 assert!(generic_stream.is_empty());
1860
1861 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1865 assert_eq!(events2.len(), 2);
1866 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1867 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1868 assert!(stream2.is_empty());
1869
1870 drop(stream1);
1872 yield_now().await;
1873
1874 assert!(stream2.is_empty());
1876
1877 drop(stream2);
1879 yield_now().await;
1880
1881 {
1884 let state = room_event_cache.inner.state.read().await.unwrap();
1886 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
1887 }
1888
1889 let events3 = room_event_cache.events().await.unwrap();
1891 assert_eq!(events3.len(), 1);
1892 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
1893 }
1894
1895 #[async_test]
1896 async fn test_rfind_map_event_in_memory_by() {
1897 let user_id = user_id!("@mnt_io:matrix.org");
1898 let room_id = room_id!("!raclette:patate.ch");
1899 let client = MockClientBuilder::new(None).build().await;
1900
1901 let event_factory = EventFactory::new().room(room_id);
1902
1903 let event_id_0 = event_id!("$ev0");
1904 let event_id_1 = event_id!("$ev1");
1905 let event_id_2 = event_id!("$ev2");
1906 let event_id_3 = event_id!("$ev3");
1907
1908 let event_0 =
1909 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
1910 let event_1 =
1911 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
1912 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
1913 let event_3 =
1914 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
1915
1916 {
1919 client
1920 .event_cache_store()
1921 .lock()
1922 .await
1923 .expect("Could not acquire the event cache lock")
1924 .as_clean()
1925 .expect("Could not acquire a clean event cache lock")
1926 .handle_linked_chunk_updates(
1927 LinkedChunkId::Room(room_id),
1928 vec![
1929 Update::NewItemsChunk {
1930 previous: None,
1931 new: ChunkIdentifier::new(0),
1932 next: None,
1933 },
1934 Update::PushItems {
1935 at: Position::new(ChunkIdentifier::new(0), 0),
1936 items: vec![event_3],
1937 },
1938 Update::NewItemsChunk {
1939 previous: Some(ChunkIdentifier::new(0)),
1940 new: ChunkIdentifier::new(1),
1941 next: None,
1942 },
1943 Update::PushItems {
1944 at: Position::new(ChunkIdentifier::new(1), 0),
1945 items: vec![event_0, event_1, event_2],
1946 },
1947 ],
1948 )
1949 .await
1950 .unwrap();
1951 }
1952
1953 let event_cache = client.event_cache();
1954 event_cache.subscribe().unwrap();
1955
1956 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1957 let room = client.get_room(room_id).unwrap();
1958 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1959
1960 assert_matches!(
1962 room_event_cache
1963 .rfind_map_event_in_memory_by(|event| {
1964 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
1965 })
1966 .await,
1967 Ok(Some(event_id)) => {
1968 assert_eq!(event_id.as_deref(), Some(event_id_0));
1969 }
1970 );
1971
1972 assert_matches!(
1975 room_event_cache
1976 .rfind_map_event_in_memory_by(|event| {
1977 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
1978 })
1979 .await,
1980 Ok(Some(event_id)) => {
1981 assert_eq!(event_id.as_deref(), Some(event_id_2));
1982 }
1983 );
1984
1985 assert!(
1987 room_event_cache
1988 .rfind_map_event_in_memory_by(|event| {
1989 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
1990 == Some(user_id))
1991 .then(|| event.event_id())
1992 })
1993 .await
1994 .unwrap()
1995 .is_none()
1996 );
1997
1998 assert!(
2000 room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2001 );
2002 }
2003
2004 #[async_test]
2005 async fn test_reload_when_dirty() {
2006 let user_id = user_id!("@mnt_io:matrix.org");
2007 let room_id = room_id!("!raclette:patate.ch");
2008
2009 let event_cache_store = MemoryStore::new();
2011
2012 let client_p0 = MockClientBuilder::new(None)
2014 .on_builder(|builder| {
2015 builder.store_config(
2016 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2017 .event_cache_store(event_cache_store.clone()),
2018 )
2019 })
2020 .build()
2021 .await;
2022
2023 let client_p1 = MockClientBuilder::new(None)
2025 .on_builder(|builder| {
2026 builder.store_config(
2027 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2028 .event_cache_store(event_cache_store),
2029 )
2030 })
2031 .build()
2032 .await;
2033
2034 let event_factory = EventFactory::new().room(room_id).sender(user_id);
2035
2036 let ev_id_0 = event_id!("$ev_0");
2037 let ev_id_1 = event_id!("$ev_1");
2038
2039 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2040 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2041
2042 client_p0
2044 .event_cache_store()
2045 .lock()
2046 .await
2047 .expect("[p0] Could not acquire the event cache lock")
2048 .as_clean()
2049 .expect("[p0] Could not acquire a clean event cache lock")
2050 .handle_linked_chunk_updates(
2051 LinkedChunkId::Room(room_id),
2052 vec![
2053 Update::NewItemsChunk {
2054 previous: None,
2055 new: ChunkIdentifier::new(0),
2056 next: None,
2057 },
2058 Update::PushItems {
2059 at: Position::new(ChunkIdentifier::new(0), 0),
2060 items: vec![ev_0],
2061 },
2062 Update::NewItemsChunk {
2063 previous: Some(ChunkIdentifier::new(0)),
2064 new: ChunkIdentifier::new(1),
2065 next: None,
2066 },
2067 Update::PushItems {
2068 at: Position::new(ChunkIdentifier::new(1), 0),
2069 items: vec![ev_1],
2070 },
2071 ],
2072 )
2073 .await
2074 .unwrap();
2075
2076 let (room_event_cache_p0, room_event_cache_p1) = {
2078 let event_cache_p0 = client_p0.event_cache();
2079 event_cache_p0.subscribe().unwrap();
2080
2081 let event_cache_p1 = client_p1.event_cache();
2082 event_cache_p1.subscribe().unwrap();
2083
2084 client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2085 client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2086
2087 let (room_event_cache_p0, _drop_handles) =
2088 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2089 let (room_event_cache_p1, _drop_handles) =
2090 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2091
2092 (room_event_cache_p0, room_event_cache_p1)
2093 };
2094
2095 let mut updates_stream_p0 = {
2100 let room_event_cache = &room_event_cache_p0;
2101
2102 let (initial_updates, mut updates_stream) =
2103 room_event_cache_p0.subscribe().await.unwrap();
2104
2105 assert_eq!(initial_updates.len(), 1);
2107 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2108 assert!(updates_stream.is_empty());
2109
2110 assert!(event_loaded(room_event_cache, ev_id_1).await);
2112
2113 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2115
2116 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2118
2119 assert_matches!(
2121 updates_stream.recv().await.unwrap(),
2122 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2123 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2124 assert_matches!(
2125 &diffs[0],
2126 VectorDiff::Insert { index: 0, value: event } => {
2127 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2128 }
2129 );
2130 }
2131 );
2132
2133 assert!(event_loaded(room_event_cache, ev_id_0).await);
2135
2136 updates_stream
2137 };
2138
2139 let mut updates_stream_p1 = {
2141 let room_event_cache = &room_event_cache_p1;
2142 let (initial_updates, mut updates_stream) =
2143 room_event_cache_p1.subscribe().await.unwrap();
2144
2145 assert_eq!(initial_updates.len(), 1);
2147 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2148 assert!(updates_stream.is_empty());
2149
2150 assert!(event_loaded(room_event_cache, ev_id_1).await);
2152
2153 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2155
2156 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2158
2159 assert_matches!(
2161 updates_stream.recv().await.unwrap(),
2162 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2163 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2164 assert_matches!(
2165 &diffs[0],
2166 VectorDiff::Insert { index: 0, value: event } => {
2167 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2168 }
2169 );
2170 }
2171 );
2172
2173 assert!(event_loaded(room_event_cache, ev_id_0).await);
2175
2176 updates_stream
2177 };
2178
2179 for _ in 0..3 {
2181 {
2185 let room_event_cache = &room_event_cache_p0;
2186 let updates_stream = &mut updates_stream_p0;
2187
2188 assert!(event_loaded(room_event_cache, ev_id_1).await);
2190
2191 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2194
2195 assert_matches!(
2197 updates_stream.recv().await.unwrap(),
2198 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2199 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2200 assert_matches!(&diffs[0], VectorDiff::Clear);
2201 assert_matches!(
2202 &diffs[1],
2203 VectorDiff::Append { values: events } => {
2204 assert_eq!(events.len(), 1);
2205 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2206 }
2207 );
2208 }
2209 );
2210
2211 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2213
2214 assert!(event_loaded(room_event_cache, ev_id_0).await);
2216
2217 assert_matches!(
2219 updates_stream.recv().await.unwrap(),
2220 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2221 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2222 assert_matches!(
2223 &diffs[0],
2224 VectorDiff::Insert { index: 0, value: event } => {
2225 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2226 }
2227 );
2228 }
2229 );
2230 }
2231
2232 {
2236 let room_event_cache = &room_event_cache_p1;
2237 let updates_stream = &mut updates_stream_p1;
2238
2239 assert!(event_loaded(room_event_cache, ev_id_1).await);
2241
2242 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2245
2246 assert_matches!(
2248 updates_stream.recv().await.unwrap(),
2249 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2250 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2251 assert_matches!(&diffs[0], VectorDiff::Clear);
2252 assert_matches!(
2253 &diffs[1],
2254 VectorDiff::Append { values: events } => {
2255 assert_eq!(events.len(), 1);
2256 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2257 }
2258 );
2259 }
2260 );
2261
2262 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2264
2265 assert!(event_loaded(room_event_cache, ev_id_0).await);
2267
2268 assert_matches!(
2270 updates_stream.recv().await.unwrap(),
2271 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2272 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2273 assert_matches!(
2274 &diffs[0],
2275 VectorDiff::Insert { index: 0, value: event } => {
2276 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2277 }
2278 );
2279 }
2280 );
2281 }
2282 }
2283
2284 for _ in 0..3 {
2287 {
2288 let room_event_cache = &room_event_cache_p0;
2289 let updates_stream = &mut updates_stream_p0;
2290
2291 let guard = room_event_cache.inner.state.read().await.unwrap();
2292
2293 assert!(guard.is_dirty().not());
2299
2300 assert_matches!(
2302 updates_stream.recv().await.unwrap(),
2303 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2304 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2305 assert_matches!(&diffs[0], VectorDiff::Clear);
2306 assert_matches!(
2307 &diffs[1],
2308 VectorDiff::Append { values: events } => {
2309 assert_eq!(events.len(), 1);
2310 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2311 }
2312 );
2313 }
2314 );
2315
2316 assert!(event_loaded(room_event_cache, ev_id_1).await);
2317 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2318
2319 drop(guard);
2325
2326 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2327 assert!(event_loaded(room_event_cache, ev_id_0).await);
2328
2329 assert_matches!(
2331 updates_stream.recv().await.unwrap(),
2332 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2333 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2334 assert_matches!(
2335 &diffs[0],
2336 VectorDiff::Insert { index: 0, value: event } => {
2337 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2338 }
2339 );
2340 }
2341 );
2342 }
2343
2344 {
2345 let room_event_cache = &room_event_cache_p1;
2346 let updates_stream = &mut updates_stream_p1;
2347
2348 let guard = room_event_cache.inner.state.read().await.unwrap();
2349
2350 assert!(guard.is_dirty().not());
2355
2356 assert_matches!(
2358 updates_stream.recv().await.unwrap(),
2359 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2360 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2361 assert_matches!(&diffs[0], VectorDiff::Clear);
2362 assert_matches!(
2363 &diffs[1],
2364 VectorDiff::Append { values: events } => {
2365 assert_eq!(events.len(), 1);
2366 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2367 }
2368 );
2369 }
2370 );
2371
2372 assert!(event_loaded(room_event_cache, ev_id_1).await);
2373 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2374
2375 drop(guard);
2381
2382 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2383 assert!(event_loaded(room_event_cache, ev_id_0).await);
2384
2385 assert_matches!(
2387 updates_stream.recv().await.unwrap(),
2388 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2389 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2390 assert_matches!(
2391 &diffs[0],
2392 VectorDiff::Insert { index: 0, value: event } => {
2393 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2394 }
2395 );
2396 }
2397 );
2398 }
2399 }
2400
2401 for _ in 0..3 {
2403 {
2404 let room_event_cache = &room_event_cache_p0;
2405 let updates_stream = &mut updates_stream_p0;
2406
2407 let guard = room_event_cache.inner.state.write().await.unwrap();
2408
2409 assert!(guard.is_dirty().not());
2411
2412 assert_matches!(
2414 updates_stream.recv().await.unwrap(),
2415 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2416 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2417 assert_matches!(&diffs[0], VectorDiff::Clear);
2418 assert_matches!(
2419 &diffs[1],
2420 VectorDiff::Append { values: events } => {
2421 assert_eq!(events.len(), 1);
2422 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2423 }
2424 );
2425 }
2426 );
2427
2428 drop(guard);
2431
2432 assert!(event_loaded(room_event_cache, ev_id_1).await);
2433 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2434
2435 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2436 assert!(event_loaded(room_event_cache, ev_id_0).await);
2437
2438 assert_matches!(
2440 updates_stream.recv().await.unwrap(),
2441 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2442 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2443 assert_matches!(
2444 &diffs[0],
2445 VectorDiff::Insert { index: 0, value: event } => {
2446 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2447 }
2448 );
2449 }
2450 );
2451 }
2452
2453 {
2454 let room_event_cache = &room_event_cache_p1;
2455 let updates_stream = &mut updates_stream_p1;
2456
2457 let guard = room_event_cache.inner.state.write().await.unwrap();
2458
2459 assert!(guard.is_dirty().not());
2461
2462 assert_matches!(
2464 updates_stream.recv().await.unwrap(),
2465 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2466 assert_eq!(diffs.len(), 2, "{diffs:#?}");
2467 assert_matches!(&diffs[0], VectorDiff::Clear);
2468 assert_matches!(
2469 &diffs[1],
2470 VectorDiff::Append { values: events } => {
2471 assert_eq!(events.len(), 1);
2472 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2473 }
2474 );
2475 }
2476 );
2477
2478 drop(guard);
2481
2482 assert!(event_loaded(room_event_cache, ev_id_1).await);
2483 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2484
2485 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2486 assert!(event_loaded(room_event_cache, ev_id_0).await);
2487
2488 assert_matches!(
2490 updates_stream.recv().await.unwrap(),
2491 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2492 assert_eq!(diffs.len(), 1, "{diffs:#?}");
2493 assert_matches!(
2494 &diffs[0],
2495 VectorDiff::Insert { index: 0, value: event } => {
2496 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2497 }
2498 );
2499 }
2500 );
2501 }
2502 }
2503 }
2504
2505 #[async_test]
2506 async fn test_load_when_dirty() {
2507 let room_id_0 = room_id!("!raclette:patate.ch");
2508 let room_id_1 = room_id!("!morbiflette:patate.ch");
2509
2510 let event_cache_store = MemoryStore::new();
2512
2513 let client_p0 = MockClientBuilder::new(None)
2515 .on_builder(|builder| {
2516 builder.store_config(
2517 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2518 .event_cache_store(event_cache_store.clone()),
2519 )
2520 })
2521 .build()
2522 .await;
2523
2524 let client_p1 = MockClientBuilder::new(None)
2526 .on_builder(|builder| {
2527 builder.store_config(
2528 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2529 .event_cache_store(event_cache_store),
2530 )
2531 })
2532 .build()
2533 .await;
2534
2535 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2537 let event_cache_p0 = client_p0.event_cache();
2538 event_cache_p0.subscribe().unwrap();
2539
2540 let event_cache_p1 = client_p1.event_cache();
2541 event_cache_p1.subscribe().unwrap();
2542
2543 client_p0
2544 .base_client()
2545 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
2546 client_p0
2547 .base_client()
2548 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
2549
2550 client_p1
2551 .base_client()
2552 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
2553 client_p1
2554 .base_client()
2555 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
2556
2557 let (room_event_cache_0_p0, _drop_handles) =
2558 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2559 let (room_event_cache_0_p1, _drop_handles) =
2560 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2561
2562 (room_event_cache_0_p0, room_event_cache_0_p1)
2563 };
2564
2565 {
2567 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2568 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2569 }
2570
2571 let (room_event_cache_1_p0, _) =
2575 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2576
2577 {
2579 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2580 assert!(guard.is_dirty().not());
2581 }
2582
2583 }
2586
2587 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2588 room_event_cache
2589 .rfind_map_event_in_memory_by(|event| {
2590 (event.event_id().as_deref() == Some(event_id)).then_some(())
2591 })
2592 .await
2593 .unwrap()
2594 .is_some()
2595 }
2596}