1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 Arc,
23 atomic::{AtomicUsize, Ordering},
24 },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::AmbiguityChange,
32 event_cache::Event,
33 linked_chunk::Position,
34 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37 EventId, OwnedEventId, OwnedRoomId, RoomId,
38 api::Direction,
39 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40 serde::Raw,
41};
42use tokio::sync::{
43 Notify,
44 broadcast::{Receiver, Sender},
45 mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50 AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51 RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54 client::WeakClient,
55 event_cache::EventCacheError,
56 room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod pinned_events;
61mod threads;
62
63pub use threads::ThreadEventCacheUpdate;
64
65#[derive(Clone)]
69pub struct RoomEventCache {
70 pub(super) inner: Arc<RoomEventCacheInner>,
71}
72
73impl fmt::Debug for RoomEventCache {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 f.debug_struct("RoomEventCache").finish_non_exhaustive()
76 }
77}
78
79#[allow(missing_debug_implementations)]
89pub struct RoomEventCacheSubscriber {
90 recv: Receiver<RoomEventCacheUpdate>,
92
93 room_id: OwnedRoomId,
95
96 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
98
99 subscriber_count: Arc<AtomicUsize>,
101}
102
103impl Drop for RoomEventCacheSubscriber {
104 fn drop(&mut self) {
105 let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
106
107 trace!(
108 "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
109 );
110
111 if previous_subscriber_count == 1 {
112 let mut room_id = self.room_id.clone();
116
117 let mut num_attempts = 0;
123
124 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
125 num_attempts += 1;
126
127 if num_attempts > 1024 {
128 warn!(
131 "couldn't send notification to the auto-shrink channel \
132 after 1024 attempts; giving up"
133 );
134 return;
135 }
136
137 match err {
138 mpsc::error::TrySendError::Full(stolen_room_id) => {
139 room_id = stolen_room_id;
140 }
141 mpsc::error::TrySendError::Closed(_) => return,
142 }
143 }
144
145 trace!("sent notification to the parent channel that we were the last subscriber");
146 }
147 }
148}
149
150impl Deref for RoomEventCacheSubscriber {
151 type Target = Receiver<RoomEventCacheUpdate>;
152
153 fn deref(&self) -> &Self::Target {
154 &self.recv
155 }
156}
157
158impl DerefMut for RoomEventCacheSubscriber {
159 fn deref_mut(&mut self) -> &mut Self::Target {
160 &mut self.recv
161 }
162}
163
164impl RoomEventCache {
165 pub(super) fn new(
167 client: WeakClient,
168 state: RoomEventCacheStateLock,
169 pagination_status: SharedObservable<RoomPaginationStatus>,
170 room_id: OwnedRoomId,
171 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
172 update_sender: Sender<RoomEventCacheUpdate>,
173 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
174 ) -> Self {
175 Self {
176 inner: Arc::new(RoomEventCacheInner::new(
177 client,
178 state,
179 pagination_status,
180 room_id,
181 auto_shrink_sender,
182 update_sender,
183 generic_update_sender,
184 )),
185 }
186 }
187
188 pub fn room_id(&self) -> &RoomId {
190 &self.inner.room_id
191 }
192
193 pub async fn events(&self) -> Result<Vec<Event>> {
198 let state = self.inner.state.read().await?;
199
200 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
201 }
202
203 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
210 let state = self.inner.state.read().await?;
211 let events =
212 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
213
214 let subscriber_count = state.subscriber_count();
215 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
216 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
217
218 let recv = self.inner.update_sender.subscribe();
219 let subscriber = RoomEventCacheSubscriber {
220 recv,
221 room_id: self.inner.room_id.clone(),
222 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
223 subscriber_count: subscriber_count.clone(),
224 };
225
226 Ok((events, subscriber))
227 }
228
229 pub async fn subscribe_to_thread(
232 &self,
233 thread_root: OwnedEventId,
234 ) -> Result<(Vec<Event>, Receiver<ThreadEventCacheUpdate>)> {
235 let mut state = self.inner.state.write().await?;
236 Ok(state.subscribe_to_thread(thread_root))
237 }
238
239 pub async fn subscribe_to_pinned_events(
249 &self,
250 ) -> Result<(Vec<Event>, Receiver<RoomEventCacheUpdate>)> {
251 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
252 let mut state = self.inner.state.write().await?;
253
254 state.subscribe_to_pinned_events(room).await
255 }
256
257 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
262 pub async fn paginate_thread_backwards(
263 &self,
264 thread_root: OwnedEventId,
265 num_events: u16,
266 ) -> Result<bool> {
267 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
268
269 let mut outcome =
271 self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone());
272
273 loop {
274 match outcome {
275 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
276 let options = RelationsOptions {
278 from: prev_token.clone(),
279 dir: Direction::Backward,
280 limit: Some(num_events.into()),
281 include_relations: IncludeRelations::AllRelations,
282 recurse: true,
283 };
284
285 let mut result = room
286 .relations(thread_root.clone(), options)
287 .await
288 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
289
290 let reached_start = result.next_batch_token.is_none();
291 trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
292
293 let root_event =
296 if reached_start {
297 Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
299 |err| EventCacheError::BackpaginationError(Box::new(err)),
300 )?)
301 } else {
302 None
303 };
304
305 let mut state = self.inner.state.write().await?;
306
307 state.save_events(result.chunk.iter().cloned()).await?;
309
310 result.chunk.extend(root_event);
313
314 if let Some(outcome) = state.finish_thread_network_pagination(
315 thread_root.clone(),
316 prev_token,
317 result.next_batch_token,
318 result.chunk,
319 ) {
320 return Ok(outcome.reached_start);
321 }
322
323 outcome = state.load_more_thread_events_backwards(thread_root.clone());
325 }
326
327 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
328 return Ok(true);
330 }
331
332 LoadMoreEventsBackwardsOutcome::Events { .. } => {
333 unimplemented!("loading from disk for threads is not implemented yet");
335 }
336 }
337 }
338 }
339
340 pub fn pagination(&self) -> RoomPagination {
343 RoomPagination { inner: self.inner.clone() }
344 }
345
346 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
355 where
356 P: FnMut(&Event, Option<&Event>) -> Option<O>,
357 {
358 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
359 }
360
361 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
366 Ok(self
367 .inner
368 .state
369 .read()
370 .await?
371 .find_event(event_id)
372 .await
373 .ok()
374 .flatten()
375 .map(|(_loc, event)| event))
376 }
377
378 pub async fn find_event_with_relations(
390 &self,
391 event_id: &EventId,
392 filter: Option<Vec<RelationType>>,
393 ) -> Result<Option<(Event, Vec<Event>)>> {
394 Ok(self
396 .inner
397 .state
398 .read()
399 .await?
400 .find_event_with_relations(event_id, filter.clone())
401 .await
402 .ok()
403 .flatten())
404 }
405
406 pub async fn find_event_relations(
418 &self,
419 event_id: &EventId,
420 filter: Option<Vec<RelationType>>,
421 ) -> Result<Vec<Event>> {
422 self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
424 }
425
426 pub async fn clear(&self) -> Result<()> {
431 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
433
434 let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
436 diffs: updates_as_vector_diffs,
437 origin: EventsOrigin::Cache,
438 });
439
440 let _ = self
442 .inner
443 .generic_update_sender
444 .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
445
446 Ok(())
447 }
448
449 pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
451 self.inner
452 .handle_timeline(
453 Timeline { limited: false, prev_batch: None, events: vec![event] },
454 Vec::new(),
455 BTreeMap::new(),
456 )
457 .await
458 }
459
460 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
463 match self.inner.state.write().await {
464 Ok(mut state_guard) => {
465 if let Err(err) = state_guard.save_events(events).await {
466 warn!("couldn't save event in the event cache: {err}");
467 }
468 }
469
470 Err(err) => {
471 warn!("couldn't save event in the event cache: {err}");
472 }
473 }
474 }
475
476 pub async fn debug_string(&self) -> Vec<String> {
479 match self.inner.state.read().await {
480 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
481 Err(err) => {
482 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
483
484 vec![]
485 }
486 }
487 }
488}
489
490pub(super) struct RoomEventCacheInner {
492 pub(super) room_id: OwnedRoomId,
494
495 pub weak_room: WeakRoom,
496
497 pub state: RoomEventCacheStateLock,
499
500 pub pagination_batch_token_notifier: Notify,
502
503 pub pagination_status: SharedObservable<RoomPaginationStatus>,
504
505 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
510
511 pub update_sender: Sender<RoomEventCacheUpdate>,
513
514 pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
520}
521
522impl RoomEventCacheInner {
523 fn new(
526 client: WeakClient,
527 state: RoomEventCacheStateLock,
528 pagination_status: SharedObservable<RoomPaginationStatus>,
529 room_id: OwnedRoomId,
530 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
531 update_sender: Sender<RoomEventCacheUpdate>,
532 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
533 ) -> Self {
534 let weak_room = WeakRoom::new(client, room_id);
535
536 Self {
537 room_id: weak_room.room_id().to_owned(),
538 weak_room,
539 state,
540 update_sender,
541 pagination_batch_token_notifier: Default::default(),
542 auto_shrink_sender,
543 pagination_status,
544 generic_update_sender,
545 }
546 }
547
548 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
549 if account_data.is_empty() {
550 return;
551 }
552
553 let mut handled_read_marker = false;
554
555 trace!("Handling account data");
556
557 for raw_event in account_data {
558 match raw_event.deserialize() {
559 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
560 if handled_read_marker {
563 continue;
564 }
565
566 handled_read_marker = true;
567
568 let _ = self.update_sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
570 event_id: ev.content.event_id,
571 });
572 }
573
574 Ok(_) => {
575 }
578
579 Err(e) => {
580 let event_type = raw_event.get_field::<String>("type").ok().flatten();
581 warn!(event_type, "Failed to deserialize account data: {e}");
582 }
583 }
584 }
585 }
586
587 #[instrument(skip_all, fields(room_id = %self.room_id))]
588 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
589 self.handle_timeline(
590 updates.timeline,
591 updates.ephemeral.clone(),
592 updates.ambiguity_changes,
593 )
594 .await?;
595 self.handle_account_data(updates.account_data);
596
597 Ok(())
598 }
599
600 #[instrument(skip_all, fields(room_id = %self.room_id))]
601 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
602 self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
603
604 Ok(())
605 }
606
607 async fn handle_timeline(
610 &self,
611 timeline: Timeline,
612 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
613 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
614 ) -> Result<()> {
615 if timeline.events.is_empty()
616 && timeline.prev_batch.is_none()
617 && ephemeral_events.is_empty()
618 && ambiguity_changes.is_empty()
619 {
620 return Ok(());
621 }
622
623 trace!("adding new events");
625
626 let (stored_prev_batch_token, timeline_event_diffs) =
627 self.state.write().await?.handle_sync(timeline).await?;
628
629 if stored_prev_batch_token {
632 self.pagination_batch_token_notifier.notify_one();
633 }
634
635 if !timeline_event_diffs.is_empty() {
638 let _ = self.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
639 diffs: timeline_event_diffs,
640 origin: EventsOrigin::Sync,
641 });
642
643 let _ = self
644 .generic_update_sender
645 .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
646 }
647
648 if !ephemeral_events.is_empty() {
649 let _ = self
650 .update_sender
651 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
652 }
653
654 if !ambiguity_changes.is_empty() {
655 let _ =
656 self.update_sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
657 }
658
659 Ok(())
660 }
661}
662
663#[derive(Debug)]
666pub(super) enum LoadMoreEventsBackwardsOutcome {
667 Gap {
669 prev_token: Option<String>,
672 },
673
674 StartOfTimeline,
676
677 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
679}
680
681mod private {
683 use std::{
684 collections::{BTreeMap, HashMap, HashSet},
685 sync::{
686 Arc, OnceLock,
687 atomic::{AtomicBool, AtomicUsize, Ordering},
688 },
689 };
690
691 use eyeball::SharedObservable;
692 use eyeball_im::VectorDiff;
693 use itertools::Itertools;
694 use matrix_sdk_base::{
695 apply_redaction,
696 deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
697 event_cache::{
698 Event, Gap,
699 store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
700 },
701 linked_chunk::{
702 ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
703 OwnedLinkedChunkId, Position, Update, lazy_loader,
704 },
705 serde_helpers::{extract_edit_target, extract_thread_root},
706 sync::Timeline,
707 };
708 use matrix_sdk_common::executor::spawn;
709 use ruma::{
710 EventId, OwnedEventId, OwnedRoomId, RoomId,
711 events::{
712 AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
713 relation::RelationType, room::redaction::SyncRoomRedactionEvent,
714 },
715 room_version_rules::RoomVersionRules,
716 serde::Raw,
717 };
718 use tokio::sync::{
719 Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
720 broadcast::{Receiver, Sender},
721 };
722 use tracing::{debug, error, instrument, trace, warn};
723
724 use super::{
725 super::{
726 BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate,
727 RoomPaginationStatus, ThreadEventCacheUpdate,
728 deduplicator::{DeduplicationOutcome, filter_duplicate_events},
729 room::threads::ThreadEventCache,
730 },
731 EventLocation, EventsOrigin, LoadMoreEventsBackwardsOutcome, RoomEventCacheGenericUpdate,
732 RoomEventCacheUpdate,
733 events::EventLinkedChunk,
734 sort_positions_descending,
735 };
736 use crate::{Room, event_cache::room::pinned_events::PinnedEventCache};
737
738 pub struct RoomEventCacheStateLock {
743 locked_state: RwLock<RoomEventCacheState>,
745
746 state_lock_upgrade_mutex: Mutex<()>,
752 }
753
754 struct RoomEventCacheState {
755 enabled_thread_support: bool,
757
758 room_id: OwnedRoomId,
760
761 store: EventCacheStoreLock,
763
764 room_linked_chunk: EventLinkedChunk,
767
768 threads: HashMap<OwnedEventId, ThreadEventCache>,
772
773 pinned_event_cache: OnceLock<PinnedEventCache>,
775
776 pagination_status: SharedObservable<RoomPaginationStatus>,
777
778 update_sender: Sender<RoomEventCacheUpdate>,
783
784 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
789
790 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
793
794 room_version_rules: RoomVersionRules,
796
797 waited_for_initial_prev_token: Arc<AtomicBool>,
802
803 subscriber_count: Arc<AtomicUsize>,
806 }
807
808 impl RoomEventCacheStateLock {
809 #[allow(clippy::too_many_arguments)]
820 pub async fn new(
821 room_id: OwnedRoomId,
822 room_version_rules: RoomVersionRules,
823 enabled_thread_support: bool,
824 update_sender: Sender<RoomEventCacheUpdate>,
825 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
826 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
827 store: EventCacheStoreLock,
828 pagination_status: SharedObservable<RoomPaginationStatus>,
829 ) -> Result<Self, EventCacheError> {
830 let store_guard = match store.lock().await? {
831 EventCacheStoreLockState::Clean(guard) => guard,
833
834 EventCacheStoreLockState::Dirty(guard) => {
837 EventCacheStoreLockGuard::clear_dirty(&guard);
838
839 guard
840 }
841 };
842
843 let linked_chunk_id = LinkedChunkId::Room(&room_id);
844
845 let full_linked_chunk_metadata =
850 match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
851 Ok(metas) => metas,
852 Err(err) => {
853 error!(
854 "error when loading a linked chunk's metadata from the store: {err}"
855 );
856
857 store_guard
859 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
860 .await?;
861
862 None
864 }
865 };
866
867 let linked_chunk = match store_guard
868 .load_last_chunk(linked_chunk_id)
869 .await
870 .map_err(EventCacheError::from)
871 .and_then(|(last_chunk, chunk_identifier_generator)| {
872 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
873 .map_err(EventCacheError::from)
874 }) {
875 Ok(linked_chunk) => linked_chunk,
876 Err(err) => {
877 error!(
878 "error when loading a linked chunk's latest chunk from the store: {err}"
879 );
880
881 store_guard
883 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
884 .await?;
885
886 None
887 }
888 };
889
890 let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false));
891
892 Ok(Self {
893 locked_state: RwLock::new(RoomEventCacheState {
894 enabled_thread_support,
895 room_id,
896 store,
897 room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
898 linked_chunk,
899 full_linked_chunk_metadata,
900 ),
901 threads: HashMap::new(),
906 pagination_status,
907 update_sender,
908 generic_update_sender,
909 linked_chunk_update_sender,
910 room_version_rules,
911 waited_for_initial_prev_token,
912 subscriber_count: Default::default(),
913 pinned_event_cache: OnceLock::new(),
914 }),
915 state_lock_upgrade_mutex: Mutex::new(()),
916 })
917 }
918
919 pub async fn read(&self) -> Result<RoomEventCacheStateLockReadGuard<'_>, EventCacheError> {
929 let _state_lock_upgrade_guard = self.state_lock_upgrade_mutex.lock().await;
976
977 let state_guard = self.locked_state.read().await;
979
980 match state_guard.store.lock().await? {
981 EventCacheStoreLockState::Clean(store_guard) => {
982 Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard })
983 }
984 EventCacheStoreLockState::Dirty(store_guard) => {
985 drop(state_guard);
989 let state_guard = self.locked_state.write().await;
990
991 let mut guard = RoomEventCacheStateLockWriteGuard {
992 state: state_guard,
993 store: store_guard,
994 };
995
996 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
998
999 EventCacheStoreLockGuard::clear_dirty(&guard.store);
1001
1002 let guard = guard.downgrade();
1004
1005 if !updates_as_vector_diffs.is_empty() {
1007 let _ = guard.state.update_sender.send(
1009 RoomEventCacheUpdate::UpdateTimelineEvents {
1010 diffs: updates_as_vector_diffs,
1011 origin: EventsOrigin::Cache,
1012 },
1013 );
1014
1015 let _ =
1017 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1018 room_id: guard.state.room_id.clone(),
1019 });
1020 }
1021
1022 Ok(guard)
1023 }
1024 }
1025 }
1026
1027 pub async fn write(
1038 &self,
1039 ) -> Result<RoomEventCacheStateLockWriteGuard<'_>, EventCacheError> {
1040 let state_guard = self.locked_state.write().await;
1041
1042 match state_guard.store.lock().await? {
1043 EventCacheStoreLockState::Clean(store_guard) => {
1044 Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard })
1045 }
1046 EventCacheStoreLockState::Dirty(store_guard) => {
1047 let mut guard = RoomEventCacheStateLockWriteGuard {
1048 state: state_guard,
1049 store: store_guard,
1050 };
1051
1052 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
1054
1055 EventCacheStoreLockGuard::clear_dirty(&guard.store);
1057
1058 if !updates_as_vector_diffs.is_empty() {
1060 let _ = guard.state.update_sender.send(
1062 RoomEventCacheUpdate::UpdateTimelineEvents {
1063 diffs: updates_as_vector_diffs,
1064 origin: EventsOrigin::Cache,
1065 },
1066 );
1067
1068 let _ =
1070 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1071 room_id: guard.state.room_id.clone(),
1072 });
1073 }
1074
1075 Ok(guard)
1076 }
1077 }
1078 }
1079 }
1080
1081 pub struct RoomEventCacheStateLockReadGuard<'a> {
1083 state: RwLockReadGuard<'a, RoomEventCacheState>,
1085
1086 store: EventCacheStoreLockGuard,
1088 }
1089
1090 pub struct RoomEventCacheStateLockWriteGuard<'a> {
1092 state: RwLockWriteGuard<'a, RoomEventCacheState>,
1094
1095 store: EventCacheStoreLockGuard,
1097 }
1098
1099 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1100 fn downgrade(self) -> RoomEventCacheStateLockReadGuard<'a> {
1108 RoomEventCacheStateLockReadGuard { state: self.state.downgrade(), store: self.store }
1109 }
1110 }
1111
1112 impl<'a> RoomEventCacheStateLockReadGuard<'a> {
1113 pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1115 &self.state.room_linked_chunk
1116 }
1117
1118 pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
1119 &self.state.subscriber_count
1120 }
1121
1122 pub async fn find_event(
1127 &self,
1128 event_id: &EventId,
1129 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1130 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1131 .await
1132 }
1133
1134 pub async fn find_event_with_relations(
1148 &self,
1149 event_id: &EventId,
1150 filters: Option<Vec<RelationType>>,
1151 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1152 find_event_with_relations(
1153 event_id,
1154 &self.state.room_id,
1155 filters,
1156 &self.state.room_linked_chunk,
1157 &self.store,
1158 )
1159 .await
1160 }
1161
1162 pub async fn find_event_relations(
1176 &self,
1177 event_id: &EventId,
1178 filters: Option<Vec<RelationType>>,
1179 ) -> Result<Vec<Event>, EventCacheError> {
1180 find_event_relations(
1181 event_id,
1182 &self.state.room_id,
1183 filters,
1184 &self.state.room_linked_chunk,
1185 &self.store,
1186 )
1187 .await
1188 }
1189
1190 pub fn rfind_map_event_in_memory_by<'i, O, P>(&'i self, mut predicate: P) -> Option<O>
1199 where
1200 P: FnMut(&'i Event, Option<&'i Event>) -> Option<O>,
1201 {
1202 self.state
1203 .room_linked_chunk
1204 .revents()
1205 .peekable()
1206 .batching(|iter| {
1207 iter.next().map(|(_position, event)| {
1208 (event, iter.peek().map(|(_next_position, next_event)| *next_event))
1209 })
1210 })
1211 .find_map(|(event, next_event)| predicate(event, next_event))
1212 }
1213
1214 #[cfg(test)]
1215 pub fn is_dirty(&self) -> bool {
1216 EventCacheStoreLockGuard::is_dirty(&self.store)
1217 }
1218 }
1219
1220 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1221 #[cfg(any(feature = "e2e-encryption", test))]
1223 pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk {
1224 &mut self.state.room_linked_chunk
1225 }
1226
1227 #[cfg(any(feature = "e2e-encryption", test))]
1230 pub fn pinned_event_cache(&self) -> Option<&PinnedEventCache> {
1231 self.state.pinned_event_cache.get()
1232 }
1233
1234 pub fn waited_for_initial_prev_token(&self) -> &Arc<AtomicBool> {
1236 &self.state.waited_for_initial_prev_token
1237 }
1238
1239 pub async fn find_event(
1244 &self,
1245 event_id: &EventId,
1246 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1247 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1248 .await
1249 }
1250
1251 pub async fn find_event_with_relations(
1265 &self,
1266 event_id: &EventId,
1267 filters: Option<Vec<RelationType>>,
1268 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1269 find_event_with_relations(
1270 event_id,
1271 &self.state.room_id,
1272 filters,
1273 &self.state.room_linked_chunk,
1274 &self.store,
1275 )
1276 .await
1277 }
1278
1279 pub async fn load_more_events_backwards(
1281 &mut self,
1282 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
1283 if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token)
1286 {
1287 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
1288 }
1289
1290 let prev_first_chunk = self
1291 .state
1292 .room_linked_chunk
1293 .chunks()
1294 .next()
1295 .expect("a linked chunk is never empty");
1296
1297 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1299 let new_first_chunk = match self
1300 .store
1301 .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
1302 .await
1303 {
1304 Ok(Some(new_first_chunk)) => {
1305 new_first_chunk
1307 }
1308
1309 Ok(None) => {
1310 if self.state.room_linked_chunk.events().next().is_some() {
1315 trace!("chunk is fully loaded and non-empty: reached_start=true");
1318 return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
1319 }
1320
1321 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
1323 }
1324
1325 Err(err) => {
1326 error!("error when loading the previous chunk of a linked chunk: {err}");
1327
1328 self.store
1330 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1331 .await?;
1332
1333 return Err(err.into());
1335 }
1336 };
1337
1338 let chunk_content = new_first_chunk.content.clone();
1339
1340 let reached_start = new_first_chunk.previous.is_none();
1346
1347 if let Err(err) =
1348 self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk)
1349 {
1350 error!("error when inserting the previous chunk into its linked chunk: {err}");
1351
1352 self.store
1354 .handle_linked_chunk_updates(
1355 LinkedChunkId::Room(&self.state.room_id),
1356 vec![Update::Clear],
1357 )
1358 .await?;
1359
1360 return Err(err.into());
1362 }
1363
1364 let _ = self.state.room_linked_chunk.store_updates().take();
1367
1368 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1370
1371 Ok(match chunk_content {
1372 ChunkContent::Gap(gap) => {
1373 trace!("reloaded chunk from disk (gap)");
1374 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1375 }
1376
1377 ChunkContent::Items(events) => {
1378 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1379 LoadMoreEventsBackwardsOutcome::Events {
1380 events,
1381 timeline_event_diffs,
1382 reached_start,
1383 }
1384 }
1385 })
1386 }
1387
1388 pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1397 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1399 let (last_chunk, chunk_identifier_generator) =
1400 match self.store.load_last_chunk(linked_chunk_id).await {
1401 Ok(pair) => pair,
1402
1403 Err(err) => {
1404 error!("error when reloading a linked chunk from memory: {err}");
1406
1407 self.store
1409 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1410 .await?;
1411
1412 (None, ChunkIdentifierGenerator::new_from_scratch())
1414 }
1415 };
1416
1417 debug!("unloading the linked chunk, and resetting it to its last chunk");
1418
1419 if let Err(err) =
1422 self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1423 {
1424 error!("error when replacing the linked chunk: {err}");
1425 return self.reset_internal().await;
1426 }
1427
1428 self.state
1432 .pagination_status
1433 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1434
1435 let _ = self.state.room_linked_chunk.store_updates().take();
1438
1439 Ok(())
1440 }
1441
1442 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1445 pub async fn auto_shrink_if_no_subscribers(
1446 &mut self,
1447 ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1448 let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
1449
1450 trace!(subscriber_count, "received request to auto-shrink");
1451
1452 if subscriber_count == 0 {
1453 self.shrink_to_last_chunk().await?;
1456
1457 Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
1458 } else {
1459 Ok(None)
1460 }
1461 }
1462
1463 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1465 pub async fn force_shrink_to_last_chunk(
1466 &mut self,
1467 ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1468 self.shrink_to_last_chunk().await?;
1469
1470 Ok(self.state.room_linked_chunk.updates_as_vector_diffs())
1471 }
1472
1473 #[instrument(skip_all)]
1479 pub async fn remove_events(
1480 &mut self,
1481 in_memory_events: Vec<(OwnedEventId, Position)>,
1482 in_store_events: Vec<(OwnedEventId, Position)>,
1483 ) -> Result<(), EventCacheError> {
1484 if !in_store_events.is_empty() {
1486 let mut positions = in_store_events
1487 .into_iter()
1488 .map(|(_event_id, position)| position)
1489 .collect::<Vec<_>>();
1490
1491 sort_positions_descending(&mut positions);
1492
1493 let updates = positions
1494 .into_iter()
1495 .map(|pos| Update::RemoveItem { at: pos })
1496 .collect::<Vec<_>>();
1497
1498 self.apply_store_only_updates(updates).await?;
1499 }
1500
1501 if in_memory_events.is_empty() {
1503 return Ok(());
1505 }
1506
1507 self.state
1509 .room_linked_chunk
1510 .remove_events_by_position(
1511 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1512 )
1513 .expect("failed to remove an event");
1514
1515 self.propagate_changes().await
1516 }
1517
1518 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1519 let updates = self.state.room_linked_chunk.store_updates().take();
1520 self.send_updates_to_store(updates).await
1521 }
1522
1523 async fn apply_store_only_updates(
1530 &mut self,
1531 updates: Vec<Update<Event, Gap>>,
1532 ) -> Result<(), EventCacheError> {
1533 self.state.room_linked_chunk.order_tracker.map_updates(&updates);
1534 self.send_updates_to_store(updates).await
1535 }
1536
1537 async fn send_updates_to_store(
1538 &mut self,
1539 mut updates: Vec<Update<Event, Gap>>,
1540 ) -> Result<(), EventCacheError> {
1541 if updates.is_empty() {
1542 return Ok(());
1543 }
1544
1545 for update in updates.iter_mut() {
1547 match update {
1548 Update::PushItems { items, .. } => strip_relations_from_events(items),
1549 Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
1550 Update::NewItemsChunk { .. }
1552 | Update::NewGapChunk { .. }
1553 | Update::RemoveChunk(_)
1554 | Update::RemoveItem { .. }
1555 | Update::DetachLastItems { .. }
1556 | Update::StartReattachItems
1557 | Update::EndReattachItems
1558 | Update::Clear => {}
1559 }
1560 }
1561
1562 let store = self.store.clone();
1569 let room_id = self.state.room_id.clone();
1570 let cloned_updates = updates.clone();
1571
1572 spawn(async move {
1573 trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1574 let linked_chunk_id = LinkedChunkId::Room(&room_id);
1575 store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1576 trace!("linked chunk updates applied");
1577
1578 super::Result::Ok(())
1579 })
1580 .await
1581 .expect("joining failed")?;
1582
1583 let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1585 linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()),
1586 updates,
1587 });
1588
1589 Ok(())
1590 }
1591
1592 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1598 self.reset_internal().await?;
1599
1600 let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
1601
1602 debug_assert_eq!(diff_updates.len(), 1);
1604 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1605
1606 Ok(diff_updates)
1607 }
1608
1609 async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1610 self.state.room_linked_chunk.reset();
1611
1612 for thread in self.state.threads.values_mut() {
1617 thread.clear();
1618 }
1619
1620 self.propagate_changes().await?;
1621
1622 self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst);
1626 self.state
1628 .pagination_status
1629 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1630
1631 Ok(())
1632 }
1633
1634 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1642 pub async fn handle_sync(
1643 &mut self,
1644 mut timeline: Timeline,
1645 ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1646 let mut prev_batch = timeline.prev_batch.take();
1647
1648 let DeduplicationOutcome {
1649 all_events: events,
1650 in_memory_duplicated_event_ids,
1651 in_store_duplicated_event_ids,
1652 non_empty_all_duplicates: all_duplicates,
1653 } = filter_duplicate_events(
1654 &self.store,
1655 LinkedChunkId::Room(&self.state.room_id),
1656 &self.state.room_linked_chunk,
1657 timeline.events,
1658 )
1659 .await?;
1660
1661 if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
1674 || all_duplicates
1675 {
1676 prev_batch = None;
1677 }
1678
1679 if prev_batch.is_some() {
1680 let mut summaries_to_update = Vec::new();
1685
1686 for (thread_root, thread) in self.state.threads.iter_mut() {
1687 thread.clear();
1689
1690 summaries_to_update.push(thread_root.clone());
1691 }
1692
1693 for thread_root in summaries_to_update {
1697 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1698 else {
1699 trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1700 continue;
1701 };
1702
1703 if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1704 prev_summary.latest_reply = None;
1705
1706 target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1707
1708 self.replace_event_at(location, target_event).await?;
1709 }
1710 }
1711 }
1712
1713 if all_duplicates {
1714 return Ok((false, Vec::new()));
1717 }
1718
1719 let has_new_gap = prev_batch.is_some();
1720
1721 if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap {
1724 self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst);
1725 }
1726
1727 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1732 .await?;
1733
1734 self.state
1735 .room_linked_chunk
1736 .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1737
1738 self.post_process_new_events(events, true).await?;
1739
1740 if timeline.limited && has_new_gap {
1741 self.shrink_to_last_chunk().await?;
1748 }
1749
1750 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1751
1752 Ok((has_new_gap, timeline_event_diffs))
1753 }
1754
1755 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1763 pub async fn handle_backpagination(
1764 &mut self,
1765 events: Vec<Event>,
1766 mut new_token: Option<String>,
1767 prev_token: Option<String>,
1768 ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1769 {
1770 let prev_gap_id = if let Some(token) = prev_token {
1773 let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| {
1775 matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1776 });
1777
1778 if gap_chunk_id.is_none() {
1779 return Ok(None);
1785 }
1786
1787 gap_chunk_id
1788 } else {
1789 None
1790 };
1791
1792 let DeduplicationOutcome {
1793 all_events: mut events,
1794 in_memory_duplicated_event_ids,
1795 in_store_duplicated_event_ids,
1796 non_empty_all_duplicates: all_duplicates,
1797 } = filter_duplicate_events(
1798 &self.store,
1799 LinkedChunkId::Room(&self.state.room_id),
1800 &self.state.room_linked_chunk,
1801 events,
1802 )
1803 .await?;
1804
1805 if !all_duplicates {
1819 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1821 .await?;
1822 } else {
1823 events.clear();
1825 new_token = None;
1828 }
1829
1830 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1833
1834 let new_gap = new_token.map(|prev_token| Gap { prev_token });
1835 let reached_start = self.state.room_linked_chunk.finish_back_pagination(
1836 prev_gap_id,
1837 new_gap,
1838 &topo_ordered_events,
1839 );
1840
1841 self.post_process_new_events(topo_ordered_events, false).await?;
1843
1844 let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1845
1846 Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1847 }
1848
1849 pub fn subscribe_to_thread(
1852 &mut self,
1853 root: OwnedEventId,
1854 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1855 self.get_or_reload_thread(root).subscribe()
1856 }
1857
1858 pub async fn subscribe_to_pinned_events(
1870 &mut self,
1871 room: Room,
1872 ) -> Result<(Vec<Event>, Receiver<RoomEventCacheUpdate>), EventCacheError> {
1873 let pinned_event_cache = self.state.pinned_event_cache.get_or_init(|| {
1874 PinnedEventCache::new(
1875 room,
1876 self.state.linked_chunk_update_sender.clone(),
1877 self.state.store.clone(),
1878 )
1879 });
1880
1881 pinned_event_cache.subscribe().await
1882 }
1883
1884 pub fn finish_thread_network_pagination(
1888 &mut self,
1889 root: OwnedEventId,
1890 prev_token: Option<String>,
1891 new_token: Option<String>,
1892 events: Vec<Event>,
1893 ) -> Option<BackPaginationOutcome> {
1894 self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1895 }
1896
1897 pub fn load_more_thread_events_backwards(
1898 &mut self,
1899 root: OwnedEventId,
1900 ) -> LoadMoreEventsBackwardsOutcome {
1901 self.get_or_reload_thread(root).load_more_events_backwards()
1902 }
1903
1904 pub(in super::super) async fn post_process_new_events(
1913 &mut self,
1914 events: Vec<Event>,
1915 is_sync: bool,
1916 ) -> Result<(), EventCacheError> {
1917 self.propagate_changes().await?;
1919
1920 let state = &mut *self.state;
1923
1924 if let Some(pinned_event_cache) = state.pinned_event_cache.get_mut() {
1925 pinned_event_cache
1926 .maybe_add_live_related_events(&events, &state.room_version_rules.redaction)
1927 .await?;
1928 }
1929
1930 let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1931
1932 for event in events {
1933 self.maybe_apply_new_redaction(&event).await?;
1934
1935 if self.state.enabled_thread_support {
1936 if is_sync {
1941 if let Some(thread_root) = extract_thread_root(event.raw()) {
1942 new_events_by_thread
1943 .entry(thread_root)
1944 .or_default()
1945 .push(event.clone());
1946 } else if let Some(event_id) = event.event_id() {
1947 if self.state.threads.contains_key(&event_id) {
1949 new_events_by_thread
1950 .entry(event_id)
1951 .or_default()
1952 .push(event.clone());
1953 }
1954 }
1955 }
1956
1957 if let Some(edit_target) = extract_edit_target(event.raw()) {
1959 if let Some((_location, edit_target_event)) =
1961 self.find_event(&edit_target).await?
1962 && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1963 {
1964 new_events_by_thread.entry(thread_root).or_default();
1967 }
1968 }
1969 }
1970
1971 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1973 self.save_events([*bundled_thread]).await?;
1974 }
1975 }
1976
1977 if self.state.enabled_thread_support {
1978 self.update_threads(new_events_by_thread).await?;
1979 }
1980
1981 Ok(())
1982 }
1983
1984 fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1985 let room_id = self.state.room_id.clone();
1988 let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1989
1990 self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1991 ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender)
1992 })
1993 }
1994
1995 #[instrument(skip_all)]
1996 async fn update_threads(
1997 &mut self,
1998 new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1999 ) -> Result<(), EventCacheError> {
2000 for (thread_root, new_events) in new_events_by_thread {
2001 let thread_cache = self.get_or_reload_thread(thread_root.clone());
2002
2003 thread_cache.add_live_events(new_events);
2004
2005 let mut latest_event_id = thread_cache.latest_event_id();
2006
2007 if let Some(event_id) = latest_event_id.as_ref()
2010 && let Some((_, edits)) = self
2011 .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
2012 .await?
2013 && let Some(latest_edit) = edits.last()
2014 {
2015 latest_event_id = latest_edit.event_id();
2016 }
2017
2018 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2019 }
2020
2021 Ok(())
2022 }
2023
2024 async fn maybe_update_thread_summary(
2026 &mut self,
2027 thread_root: OwnedEventId,
2028 latest_event_id: Option<OwnedEventId>,
2029 ) -> Result<(), EventCacheError> {
2030 let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
2034 trace!(%thread_root, "thread root event is missing from the room linked chunk");
2035 return Ok(());
2036 };
2037
2038 let prev_summary = target_event.thread_summary.summary();
2039
2040 let num_replies = {
2049 let thread_replies = self
2050 .store
2051 .find_event_relations(
2052 &self.state.room_id,
2053 &thread_root,
2054 Some(&[RelationType::Thread]),
2055 )
2056 .await?;
2057 thread_replies.len().try_into().unwrap_or(u32::MAX)
2058 };
2059
2060 let new_summary = if num_replies > 0 {
2061 Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
2062 } else {
2063 None
2064 };
2065
2066 if prev_summary == new_summary.as_ref() {
2067 trace!(%thread_root, "thread summary is already up-to-date");
2068 return Ok(());
2069 }
2070
2071 trace!(%thread_root, "updating thread summary: {new_summary:?}");
2073 target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
2074 self.replace_event_at(location, target_event).await
2075 }
2076
2077 pub(crate) async fn replace_event_at(
2084 &mut self,
2085 location: EventLocation,
2086 event: Event,
2087 ) -> Result<(), EventCacheError> {
2088 match location {
2089 EventLocation::Memory(position) => {
2090 self.state
2091 .room_linked_chunk
2092 .replace_event_at(position, event)
2093 .expect("should have been a valid position of an item");
2094 self.propagate_changes().await?;
2097 }
2098 EventLocation::Store => {
2099 self.save_events([event]).await?;
2100 }
2101 }
2102
2103 Ok(())
2104 }
2105
2106 #[instrument(skip_all)]
2110 async fn maybe_apply_new_redaction(
2111 &mut self,
2112 event: &Event,
2113 ) -> Result<(), EventCacheError> {
2114 let raw_event = event.raw();
2115
2116 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
2119 raw_event.get_field::<MessageLikeEventType>("type")
2120 else {
2121 return Ok(());
2122 };
2123
2124 let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
2127 redaction,
2128 ))) = raw_event.deserialize()
2129 else {
2130 return Ok(());
2131 };
2132
2133 let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
2134 warn!("missing target event id from the redaction event");
2135 return Ok(());
2136 };
2137
2138 let Some((location, mut target_event)) = self.find_event(event_id).await? else {
2140 trace!("redacted event is missing from the linked chunk");
2141 return Ok(());
2142 };
2143
2144 let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
2146 match deserialized {
2149 AnySyncTimelineEvent::MessageLike(ev) => {
2150 if ev.is_redacted() {
2151 return Ok(());
2152 }
2153 }
2154 AnySyncTimelineEvent::State(ev) => {
2155 if ev.is_redacted() {
2156 return Ok(());
2157 }
2158 }
2159 }
2160
2161 extract_thread_root(target_event.raw())
2164 } else {
2165 warn!("failed to deserialize the event to redact");
2166 None
2167 };
2168
2169 if let Some(redacted_event) = apply_redaction(
2170 target_event.raw(),
2171 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
2172 &self.state.room_version_rules.redaction,
2173 ) {
2174 target_event.replace_raw(redacted_event.cast_unchecked());
2179
2180 self.replace_event_at(location, target_event).await?;
2181
2182 if let Some(thread_root) = thread_root
2190 && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
2191 {
2192 thread_cache.remove_if_present(event_id);
2193
2194 let latest_event_id = thread_cache.latest_event_id();
2197 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2198 }
2199 }
2200
2201 Ok(())
2202 }
2203
2204 pub async fn save_events(
2206 &mut self,
2207 events: impl IntoIterator<Item = Event>,
2208 ) -> Result<(), EventCacheError> {
2209 let store = self.store.clone();
2210 let room_id = self.state.room_id.clone();
2211 let events = events.into_iter().collect::<Vec<_>>();
2212
2213 spawn(async move {
2215 for event in events {
2216 store.save_event(&room_id, event).await?;
2217 }
2218 super::Result::Ok(())
2219 })
2220 .await
2221 .expect("joining failed")?;
2222
2223 Ok(())
2224 }
2225
2226 #[cfg(test)]
2227 pub fn is_dirty(&self) -> bool {
2228 EventCacheStoreLockGuard::is_dirty(&self.store)
2229 }
2230 }
2231
2232 async fn load_linked_chunk_metadata(
2238 store_guard: &EventCacheStoreLockGuard,
2239 linked_chunk_id: LinkedChunkId<'_>,
2240 ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
2241 let mut all_chunks = store_guard
2242 .load_all_chunks_metadata(linked_chunk_id)
2243 .await
2244 .map_err(EventCacheError::from)?;
2245
2246 if all_chunks.is_empty() {
2247 return Ok(None);
2249 }
2250
2251 let chunk_map: HashMap<_, _> =
2253 all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
2254
2255 let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
2257 let Some(last) = iter.next() else {
2258 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2259 details: "no last chunk found".to_owned(),
2260 });
2261 };
2262
2263 if let Some(other_last) = iter.next() {
2265 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2266 details: format!(
2267 "chunks {} and {} both claim to be last chunks",
2268 last.identifier.index(),
2269 other_last.identifier.index()
2270 ),
2271 });
2272 }
2273
2274 let mut seen = HashSet::new();
2277 let mut current = last;
2278 loop {
2279 if !seen.insert(current.identifier) {
2281 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2282 details: format!(
2283 "cycle detected in linked chunk at {}",
2284 current.identifier.index()
2285 ),
2286 });
2287 }
2288
2289 let Some(prev_id) = current.previous else {
2290 if seen.len() != all_chunks.len() {
2292 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2293 details: format!(
2294 "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
2295 seen.len(),
2296 all_chunks.len()
2297 ),
2298 });
2299 }
2300 break;
2301 };
2302
2303 let Some(pred_meta) = chunk_map.get(&prev_id) else {
2306 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2307 details: format!(
2308 "missing predecessor {} chunk for {}",
2309 prev_id.index(),
2310 current.identifier.index()
2311 ),
2312 });
2313 };
2314
2315 if pred_meta.next != Some(current.identifier) {
2317 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2318 details: format!(
2319 "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
2320 pred_meta.identifier.index(),
2321 pred_meta.next.map(|chunk_id| chunk_id.index()),
2322 current.identifier.index()
2323 ),
2324 });
2325 }
2326
2327 current = *pred_meta;
2328 }
2329
2330 let mut current = current.identifier;
2338 for i in 0..all_chunks.len() {
2339 let j = all_chunks
2341 .iter()
2342 .rev()
2343 .position(|meta| meta.identifier == current)
2344 .map(|j| all_chunks.len() - 1 - j)
2345 .expect("the target chunk must be present in the metadata");
2346 if i != j {
2347 all_chunks.swap(i, j);
2348 }
2349 if let Some(next) = all_chunks[i].next {
2350 current = next;
2351 }
2352 }
2353
2354 Ok(Some(all_chunks))
2355 }
2356
2357 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
2361 let mut closure = || -> Option<()> {
2365 let mut val: serde_json::Value = event.deserialize_as().ok()?;
2366 let unsigned = val.get_mut("unsigned")?;
2367 let unsigned_obj = unsigned.as_object_mut()?;
2368 if unsigned_obj.remove("m.relations").is_some() {
2369 *event = Raw::new(&val).ok()?.cast_unchecked();
2370 }
2371 None
2372 };
2373 let _ = closure();
2374 }
2375
2376 fn strip_relations_from_event(ev: &mut Event) {
2377 match &mut ev.kind {
2378 TimelineEventKind::Decrypted(decrypted) => {
2379 decrypted.unsigned_encryption_info = None;
2382
2383 strip_relations_if_present(&mut decrypted.event);
2385 }
2386
2387 TimelineEventKind::UnableToDecrypt { event, .. }
2388 | TimelineEventKind::PlainText { event } => {
2389 strip_relations_if_present(event);
2390 }
2391 }
2392 }
2393
2394 fn strip_relations_from_events(items: &mut [Event]) {
2396 for ev in items.iter_mut() {
2397 strip_relations_from_event(ev);
2398 }
2399 }
2400
2401 async fn find_event(
2404 event_id: &EventId,
2405 room_id: &RoomId,
2406 room_linked_chunk: &EventLinkedChunk,
2407 store: &EventCacheStoreLockGuard,
2408 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
2409 for (position, event) in room_linked_chunk.revents() {
2412 if event.event_id().as_deref() == Some(event_id) {
2413 return Ok(Some((EventLocation::Memory(position), event.clone())));
2414 }
2415 }
2416
2417 Ok(store.find_event(room_id, event_id).await?.map(|event| (EventLocation::Store, event)))
2418 }
2419
2420 async fn find_event_with_relations(
2424 event_id: &EventId,
2425 room_id: &RoomId,
2426 filters: Option<Vec<RelationType>>,
2427 room_linked_chunk: &EventLinkedChunk,
2428 store: &EventCacheStoreLockGuard,
2429 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
2430 let found = store.find_event(room_id, event_id).await?;
2432
2433 let Some(target) = found else {
2434 return Ok(None);
2436 };
2437
2438 let related =
2440 find_event_relations(event_id, room_id, filters, room_linked_chunk, store).await?;
2441
2442 Ok(Some((target, related)))
2443 }
2444
2445 async fn find_event_relations(
2448 event_id: &EventId,
2449 room_id: &RoomId,
2450 filters: Option<Vec<RelationType>>,
2451 room_linked_chunk: &EventLinkedChunk,
2452 store: &EventCacheStoreLockGuard,
2453 ) -> Result<Vec<Event>, EventCacheError> {
2454 let mut related = store.find_event_relations(room_id, event_id, filters.as_deref()).await?;
2457 let mut stack =
2458 related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
2459
2460 let mut already_seen = HashSet::new();
2463 already_seen.insert(event_id.to_owned());
2464
2465 let mut num_iters = 1;
2466
2467 while let Some(event_id) = stack.pop() {
2469 if !already_seen.insert(event_id.clone()) {
2470 continue;
2472 }
2473
2474 let other_related =
2475 store.find_event_relations(room_id, &event_id, filters.as_deref()).await?;
2476
2477 stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
2478 related.extend(other_related);
2479
2480 num_iters += 1;
2481 }
2482
2483 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
2484
2485 related.sort_by(|(_, lhs), (_, rhs)| {
2489 use std::cmp::Ordering;
2490
2491 match (lhs, rhs) {
2492 (None, None) => Ordering::Equal,
2493 (None, Some(_)) => Ordering::Less,
2494 (Some(_), None) => Ordering::Greater,
2495 (Some(lhs), Some(rhs)) => {
2496 let lhs = room_linked_chunk.event_order(*lhs);
2497 let rhs = room_linked_chunk.event_order(*rhs);
2498
2499 match (lhs, rhs) {
2503 (None, None) => Ordering::Equal,
2504 (None, Some(_)) => Ordering::Less,
2505 (Some(_), None) => Ordering::Greater,
2506 (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
2507 }
2508 }
2509 }
2510 });
2511
2512 let related = related.into_iter().map(|(event, _pos)| event).collect();
2514
2515 Ok(related)
2516 }
2517}
2518
2519pub(super) enum EventLocation {
2521 Memory(Position),
2523
2524 Store,
2526}
2527
2528pub(super) use private::RoomEventCacheStateLock;
2529
2530#[cfg(test)]
2531mod tests {
2532 use matrix_sdk_base::event_cache::Event;
2533 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2534 use ruma::{
2535 RoomId, event_id,
2536 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2537 room_id, user_id,
2538 };
2539
2540 use crate::test_utils::logged_in_client;
2541
2542 #[async_test]
2543 async fn test_find_event_by_id_with_edit_relation() {
2544 let original_id = event_id!("$original");
2545 let related_id = event_id!("$related");
2546 let room_id = room_id!("!galette:saucisse.bzh");
2547 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2548
2549 assert_relations(
2550 room_id,
2551 f.text_msg("Original event").event_id(original_id).into(),
2552 f.text_msg("* An edited event")
2553 .edit(
2554 original_id,
2555 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2556 )
2557 .event_id(related_id)
2558 .into(),
2559 f,
2560 )
2561 .await;
2562 }
2563
2564 #[async_test]
2565 async fn test_find_event_by_id_with_thread_reply_relation() {
2566 let original_id = event_id!("$original");
2567 let related_id = event_id!("$related");
2568 let room_id = room_id!("!galette:saucisse.bzh");
2569 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2570
2571 assert_relations(
2572 room_id,
2573 f.text_msg("Original event").event_id(original_id).into(),
2574 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2575 f,
2576 )
2577 .await;
2578 }
2579
2580 #[async_test]
2581 async fn test_find_event_by_id_with_reaction_relation() {
2582 let original_id = event_id!("$original");
2583 let related_id = event_id!("$related");
2584 let room_id = room_id!("!galette:saucisse.bzh");
2585 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2586
2587 assert_relations(
2588 room_id,
2589 f.text_msg("Original event").event_id(original_id).into(),
2590 f.reaction(original_id, ":D").event_id(related_id).into(),
2591 f,
2592 )
2593 .await;
2594 }
2595
2596 #[async_test]
2597 async fn test_find_event_by_id_with_poll_response_relation() {
2598 let original_id = event_id!("$original");
2599 let related_id = event_id!("$related");
2600 let room_id = room_id!("!galette:saucisse.bzh");
2601 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2602
2603 assert_relations(
2604 room_id,
2605 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2606 .event_id(original_id)
2607 .into(),
2608 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2609 f,
2610 )
2611 .await;
2612 }
2613
2614 #[async_test]
2615 async fn test_find_event_by_id_with_poll_end_relation() {
2616 let original_id = event_id!("$original");
2617 let related_id = event_id!("$related");
2618 let room_id = room_id!("!galette:saucisse.bzh");
2619 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2620
2621 assert_relations(
2622 room_id,
2623 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2624 .event_id(original_id)
2625 .into(),
2626 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2627 f,
2628 )
2629 .await;
2630 }
2631
2632 #[async_test]
2633 async fn test_find_event_by_id_with_filtered_relationships() {
2634 let original_id = event_id!("$original");
2635 let related_id = event_id!("$related");
2636 let associated_related_id = event_id!("$recursive_related");
2637 let room_id = room_id!("!galette:saucisse.bzh");
2638 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2639
2640 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2641 let related_event = event_factory
2642 .text_msg("* Edited event")
2643 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2644 .event_id(related_id)
2645 .into();
2646 let associated_related_event =
2647 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2648
2649 let client = logged_in_client(None).await;
2650
2651 let event_cache = client.event_cache();
2652 event_cache.subscribe().unwrap();
2653
2654 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2655 let room = client.get_room(room_id).unwrap();
2656
2657 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2658
2659 room_event_cache.save_events([original_event]).await;
2661
2662 room_event_cache.save_events([related_event]).await;
2664
2665 room_event_cache.save_events([associated_related_event]).await;
2667
2668 let filter = Some(vec![RelationType::Replacement]);
2669 let (event, related_events) = room_event_cache
2670 .find_event_with_relations(original_id, filter)
2671 .await
2672 .expect("Failed to find the event with relations")
2673 .expect("Event has no relation");
2674 let cached_event_id = event.event_id().unwrap();
2676 assert_eq!(cached_event_id, original_id);
2677
2678 assert_eq!(related_events.len(), 1);
2680
2681 let related_event_id = related_events[0].event_id().unwrap();
2682 assert_eq!(related_event_id, related_id);
2683
2684 let filter = Some(vec![RelationType::Thread]);
2686 let (event, related_events) = room_event_cache
2687 .find_event_with_relations(original_id, filter)
2688 .await
2689 .expect("Failed to find the event with relations")
2690 .expect("Event has no relation");
2691
2692 let cached_event_id = event.event_id().unwrap();
2694 assert_eq!(cached_event_id, original_id);
2695 assert!(related_events.is_empty());
2697 }
2698
2699 #[async_test]
2700 async fn test_find_event_by_id_with_recursive_relation() {
2701 let original_id = event_id!("$original");
2702 let related_id = event_id!("$related");
2703 let associated_related_id = event_id!("$recursive_related");
2704 let room_id = room_id!("!galette:saucisse.bzh");
2705 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2706
2707 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2708 let related_event = event_factory
2709 .text_msg("* Edited event")
2710 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2711 .event_id(related_id)
2712 .into();
2713 let associated_related_event =
2714 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2715
2716 let client = logged_in_client(None).await;
2717
2718 let event_cache = client.event_cache();
2719 event_cache.subscribe().unwrap();
2720
2721 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2722 let room = client.get_room(room_id).unwrap();
2723
2724 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2725
2726 room_event_cache.save_events([original_event]).await;
2728
2729 room_event_cache.save_events([related_event]).await;
2731
2732 room_event_cache.save_events([associated_related_event]).await;
2734
2735 let (event, related_events) = room_event_cache
2736 .find_event_with_relations(original_id, None)
2737 .await
2738 .expect("Failed to find the event with relations")
2739 .expect("Event has no relation");
2740 let cached_event_id = event.event_id().unwrap();
2742 assert_eq!(cached_event_id, original_id);
2743
2744 assert_eq!(related_events.len(), 2);
2746
2747 let related_event_id = related_events[0].event_id().unwrap();
2748 assert_eq!(related_event_id, related_id);
2749 let related_event_id = related_events[1].event_id().unwrap();
2750 assert_eq!(related_event_id, associated_related_id);
2751 }
2752
2753 async fn assert_relations(
2754 room_id: &RoomId,
2755 original_event: Event,
2756 related_event: Event,
2757 event_factory: EventFactory,
2758 ) {
2759 let client = logged_in_client(None).await;
2760
2761 let event_cache = client.event_cache();
2762 event_cache.subscribe().unwrap();
2763
2764 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2765 let room = client.get_room(room_id).unwrap();
2766
2767 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2768
2769 let original_event_id = original_event.event_id().unwrap();
2771 room_event_cache.save_events([original_event]).await;
2772
2773 let unrelated_id = event_id!("$2");
2775 room_event_cache
2776 .save_events([event_factory
2777 .text_msg("An unrelated event")
2778 .event_id(unrelated_id)
2779 .into()])
2780 .await;
2781
2782 let related_id = related_event.event_id().unwrap();
2784 room_event_cache.save_events([related_event]).await;
2785
2786 let (event, related_events) = room_event_cache
2787 .find_event_with_relations(&original_event_id, None)
2788 .await
2789 .expect("Failed to find the event with relations")
2790 .expect("Event has no relation");
2791 let cached_event_id = event.event_id().unwrap();
2793 assert_eq!(cached_event_id, original_event_id);
2794
2795 let related_event_id = related_events[0].event_id().unwrap();
2797 assert_eq!(related_event_id, related_id);
2798 }
2799}
2800
2801#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
2803 use std::{ops::Not, sync::Arc};
2804
2805 use assert_matches::assert_matches;
2806 use assert_matches2::assert_let;
2807 use eyeball_im::VectorDiff;
2808 use futures_util::FutureExt;
2809 use matrix_sdk_base::{
2810 event_cache::{
2811 Gap,
2812 store::{EventCacheStore as _, MemoryStore},
2813 },
2814 linked_chunk::{
2815 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2816 lazy_loader::from_all_chunks,
2817 },
2818 store::StoreConfig,
2819 sync::{JoinedRoomUpdate, Timeline},
2820 };
2821 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2822 use ruma::{
2823 EventId, OwnedUserId, event_id,
2824 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2825 room_id, user_id,
2826 };
2827 use tokio::task::yield_now;
2828
2829 use super::RoomEventCacheGenericUpdate;
2830 use crate::{
2831 assert_let_timeout,
2832 event_cache::{RoomEventCache, RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2833 test_utils::client::MockClientBuilder,
2834 };
2835
2836 #[async_test]
2837 async fn test_write_to_storage() {
2838 let room_id = room_id!("!galette:saucisse.bzh");
2839 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2840
2841 let event_cache_store = Arc::new(MemoryStore::new());
2842
2843 let client = MockClientBuilder::new(None)
2844 .on_builder(|builder| {
2845 builder.store_config(
2846 StoreConfig::new("hodlor".to_owned())
2847 .event_cache_store(event_cache_store.clone()),
2848 )
2849 })
2850 .build()
2851 .await;
2852
2853 let event_cache = client.event_cache();
2854
2855 event_cache.subscribe().unwrap();
2857
2858 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2859 let room = client.get_room(room_id).unwrap();
2860
2861 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2862 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2863
2864 let timeline = Timeline {
2866 limited: true,
2867 prev_batch: Some("raclette".to_owned()),
2868 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2869 };
2870
2871 room_event_cache
2872 .inner
2873 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2874 .await
2875 .unwrap();
2876
2877 assert_matches!(
2879 generic_stream.recv().await,
2880 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2881 assert_eq!(expected_room_id, room_id);
2882 }
2883 );
2884 assert!(generic_stream.is_empty());
2885
2886 let linked_chunk = from_all_chunks::<3, _, _>(
2888 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2889 )
2890 .unwrap()
2891 .unwrap();
2892
2893 assert_eq!(linked_chunk.chunks().count(), 2);
2894
2895 let mut chunks = linked_chunk.chunks();
2896
2897 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2899 assert_eq!(gap.prev_token, "raclette");
2900 });
2901
2902 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2904 assert_eq!(events.len(), 1);
2905 let deserialized = events[0].raw().deserialize().unwrap();
2906 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2907 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2908 });
2909
2910 assert!(chunks.next().is_none());
2912 }
2913
2914 #[async_test]
2915 async fn test_write_to_storage_strips_bundled_relations() {
2916 let room_id = room_id!("!galette:saucisse.bzh");
2917 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2918
2919 let event_cache_store = Arc::new(MemoryStore::new());
2920
2921 let client = MockClientBuilder::new(None)
2922 .on_builder(|builder| {
2923 builder.store_config(
2924 StoreConfig::new("hodlor".to_owned())
2925 .event_cache_store(event_cache_store.clone()),
2926 )
2927 })
2928 .build()
2929 .await;
2930
2931 let event_cache = client.event_cache();
2932
2933 event_cache.subscribe().unwrap();
2935
2936 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2937 let room = client.get_room(room_id).unwrap();
2938
2939 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2940 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2941
2942 let ev = f
2944 .text_msg("hey yo")
2945 .sender(*ALICE)
2946 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2947 .into_event();
2948
2949 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2950
2951 room_event_cache
2952 .inner
2953 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2954 .await
2955 .unwrap();
2956
2957 assert_matches!(
2959 generic_stream.recv().await,
2960 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2961 assert_eq!(expected_room_id, room_id);
2962 }
2963 );
2964 assert!(generic_stream.is_empty());
2965
2966 {
2968 let events = room_event_cache.events().await.unwrap();
2969
2970 assert_eq!(events.len(), 1);
2971
2972 let ev = events[0].raw().deserialize().unwrap();
2973 assert_let!(
2974 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2975 );
2976
2977 let original = msg.as_original().unwrap();
2978 assert_eq!(original.content.body(), "hey yo");
2979 assert!(original.unsigned.relations.replace.is_some());
2980 }
2981
2982 let linked_chunk = from_all_chunks::<3, _, _>(
2984 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2985 )
2986 .unwrap()
2987 .unwrap();
2988
2989 assert_eq!(linked_chunk.chunks().count(), 1);
2990
2991 let mut chunks = linked_chunk.chunks();
2992 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2993 assert_eq!(events.len(), 1);
2994
2995 let ev = events[0].raw().deserialize().unwrap();
2996 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2997
2998 let original = msg.as_original().unwrap();
2999 assert_eq!(original.content.body(), "hey yo");
3000 assert!(original.unsigned.relations.replace.is_none());
3001 });
3002
3003 assert!(chunks.next().is_none());
3005 }
3006
3007 #[async_test]
3008 async fn test_clear() {
3009 let room_id = room_id!("!galette:saucisse.bzh");
3010 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
3011
3012 let event_cache_store = Arc::new(MemoryStore::new());
3013
3014 let event_id1 = event_id!("$1");
3015 let event_id2 = event_id!("$2");
3016
3017 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3018 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3019
3020 event_cache_store
3022 .handle_linked_chunk_updates(
3023 LinkedChunkId::Room(room_id),
3024 vec![
3025 Update::NewItemsChunk {
3027 previous: None,
3028 new: ChunkIdentifier::new(0),
3029 next: None,
3030 },
3031 Update::NewGapChunk {
3033 previous: Some(ChunkIdentifier::new(0)),
3034 new: ChunkIdentifier::new(42),
3036 next: None,
3037 gap: Gap { prev_token: "comté".to_owned() },
3038 },
3039 Update::NewItemsChunk {
3041 previous: Some(ChunkIdentifier::new(42)),
3042 new: ChunkIdentifier::new(1),
3043 next: None,
3044 },
3045 Update::PushItems {
3046 at: Position::new(ChunkIdentifier::new(1), 0),
3047 items: vec![ev1.clone()],
3048 },
3049 Update::NewItemsChunk {
3051 previous: Some(ChunkIdentifier::new(1)),
3052 new: ChunkIdentifier::new(2),
3053 next: None,
3054 },
3055 Update::PushItems {
3056 at: Position::new(ChunkIdentifier::new(2), 0),
3057 items: vec![ev2.clone()],
3058 },
3059 ],
3060 )
3061 .await
3062 .unwrap();
3063
3064 let client = MockClientBuilder::new(None)
3065 .on_builder(|builder| {
3066 builder.store_config(
3067 StoreConfig::new("hodlor".to_owned())
3068 .event_cache_store(event_cache_store.clone()),
3069 )
3070 })
3071 .build()
3072 .await;
3073
3074 let event_cache = client.event_cache();
3075
3076 event_cache.subscribe().unwrap();
3078
3079 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3080 let room = client.get_room(room_id).unwrap();
3081
3082 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3083
3084 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3085 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3086
3087 {
3089 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3090 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3091 }
3092
3093 {
3095 assert_eq!(items.len(), 1);
3097 assert_eq!(items[0].event_id().unwrap(), event_id2);
3098
3099 assert!(stream.is_empty());
3100 }
3101
3102 {
3104 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3105
3106 assert_let_timeout!(
3107 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3108 );
3109 assert_eq!(diffs.len(), 1);
3110 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3111 assert_eq!(event.event_id().unwrap(), event_id1);
3113 });
3114
3115 assert!(stream.is_empty());
3116
3117 assert_let_timeout!(
3118 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
3119 generic_stream.recv()
3120 );
3121 assert_eq!(room_id, expected_room_id);
3122 assert!(generic_stream.is_empty());
3123 }
3124
3125 room_event_cache.clear().await.unwrap();
3127
3128 assert_let_timeout!(
3130 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3131 );
3132 assert_eq!(diffs.len(), 1);
3133 assert_let!(VectorDiff::Clear = &diffs[0]);
3134
3135 assert_let_timeout!(
3137 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3138 );
3139 assert_eq!(received_room_id, room_id);
3140 assert!(generic_stream.is_empty());
3141
3142 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3145
3146 let items = room_event_cache.events().await.unwrap();
3148 assert!(items.is_empty());
3149
3150 let linked_chunk = from_all_chunks::<3, _, _>(
3152 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
3153 )
3154 .unwrap()
3155 .unwrap();
3156
3157 assert_eq!(linked_chunk.num_items(), 0);
3161 }
3162
3163 #[async_test]
3164 async fn test_load_from_storage() {
3165 let room_id = room_id!("!galette:saucisse.bzh");
3166 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
3167
3168 let event_cache_store = Arc::new(MemoryStore::new());
3169
3170 let event_id1 = event_id!("$1");
3171 let event_id2 = event_id!("$2");
3172
3173 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3174 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3175
3176 event_cache_store
3178 .handle_linked_chunk_updates(
3179 LinkedChunkId::Room(room_id),
3180 vec![
3181 Update::NewItemsChunk {
3183 previous: None,
3184 new: ChunkIdentifier::new(0),
3185 next: None,
3186 },
3187 Update::NewGapChunk {
3189 previous: Some(ChunkIdentifier::new(0)),
3190 new: ChunkIdentifier::new(42),
3192 next: None,
3193 gap: Gap { prev_token: "cheddar".to_owned() },
3194 },
3195 Update::NewItemsChunk {
3197 previous: Some(ChunkIdentifier::new(42)),
3198 new: ChunkIdentifier::new(1),
3199 next: None,
3200 },
3201 Update::PushItems {
3202 at: Position::new(ChunkIdentifier::new(1), 0),
3203 items: vec![ev1.clone()],
3204 },
3205 Update::NewItemsChunk {
3207 previous: Some(ChunkIdentifier::new(1)),
3208 new: ChunkIdentifier::new(2),
3209 next: None,
3210 },
3211 Update::PushItems {
3212 at: Position::new(ChunkIdentifier::new(2), 0),
3213 items: vec![ev2.clone()],
3214 },
3215 ],
3216 )
3217 .await
3218 .unwrap();
3219
3220 let client = MockClientBuilder::new(None)
3221 .on_builder(|builder| {
3222 builder.store_config(
3223 StoreConfig::new("hodlor".to_owned())
3224 .event_cache_store(event_cache_store.clone()),
3225 )
3226 })
3227 .build()
3228 .await;
3229
3230 let event_cache = client.event_cache();
3231
3232 event_cache.subscribe().unwrap();
3234
3235 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3237
3238 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3239 let room = client.get_room(room_id).unwrap();
3240
3241 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3242
3243 assert_matches!(
3246 generic_stream.recv().await,
3247 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3248 assert_eq!(room_id, expected_room_id);
3249 }
3250 );
3251 assert!(generic_stream.is_empty());
3252
3253 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3254
3255 assert_eq!(items.len(), 1);
3258 assert_eq!(items[0].event_id().unwrap(), event_id2);
3259 assert!(stream.is_empty());
3260
3261 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3263 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3264
3265 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3267
3268 assert_let_timeout!(
3269 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3270 );
3271 assert_eq!(diffs.len(), 1);
3272 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3273 assert_eq!(event.event_id().unwrap(), event_id1);
3274 });
3275
3276 assert!(stream.is_empty());
3277
3278 assert_matches!(
3280 generic_stream.recv().await,
3281 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3282 assert_eq!(expected_room_id, room_id);
3283 }
3284 );
3285 assert!(generic_stream.is_empty());
3286
3287 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
3289
3290 room_event_cache
3291 .inner
3292 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
3293 .await
3294 .unwrap();
3295
3296 assert!(generic_stream.recv().now_or_never().is_none());
3299
3300 let items = room_event_cache.events().await.unwrap();
3305 assert_eq!(items.len(), 2);
3306 assert_eq!(items[0].event_id().unwrap(), event_id1);
3307 assert_eq!(items[1].event_id().unwrap(), event_id2);
3308 }
3309
3310 #[async_test]
3311 async fn test_load_from_storage_resilient_to_failure() {
3312 let room_id = room_id!("!fondue:patate.ch");
3313 let event_cache_store = Arc::new(MemoryStore::new());
3314
3315 let event = EventFactory::new()
3316 .room(room_id)
3317 .sender(user_id!("@ben:saucisse.bzh"))
3318 .text_msg("foo")
3319 .event_id(event_id!("$42"))
3320 .into_event();
3321
3322 event_cache_store
3324 .handle_linked_chunk_updates(
3325 LinkedChunkId::Room(room_id),
3326 vec![
3327 Update::NewItemsChunk {
3328 previous: None,
3329 new: ChunkIdentifier::new(0),
3330 next: None,
3331 },
3332 Update::PushItems {
3333 at: Position::new(ChunkIdentifier::new(0), 0),
3334 items: vec![event],
3335 },
3336 Update::NewItemsChunk {
3337 previous: Some(ChunkIdentifier::new(0)),
3338 new: ChunkIdentifier::new(1),
3339 next: Some(ChunkIdentifier::new(0)),
3340 },
3341 ],
3342 )
3343 .await
3344 .unwrap();
3345
3346 let client = MockClientBuilder::new(None)
3347 .on_builder(|builder| {
3348 builder.store_config(
3349 StoreConfig::new("holder".to_owned())
3350 .event_cache_store(event_cache_store.clone()),
3351 )
3352 })
3353 .build()
3354 .await;
3355
3356 let event_cache = client.event_cache();
3357
3358 event_cache.subscribe().unwrap();
3360
3361 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3362 let room = client.get_room(room_id).unwrap();
3363
3364 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3365
3366 let items = room_event_cache.events().await.unwrap();
3367
3368 assert!(items.is_empty());
3371
3372 let raw_chunks =
3375 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
3376 assert!(raw_chunks.is_empty());
3377 }
3378
3379 #[async_test]
3380 async fn test_no_useless_gaps() {
3381 let room_id = room_id!("!galette:saucisse.bzh");
3382
3383 let client = MockClientBuilder::new(None).build().await;
3384
3385 let event_cache = client.event_cache();
3386 event_cache.subscribe().unwrap();
3387
3388 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3389 let room = client.get_room(room_id).unwrap();
3390 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3391 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3392
3393 let f = EventFactory::new().room(room_id).sender(*ALICE);
3394
3395 room_event_cache
3398 .inner
3399 .handle_joined_room_update(JoinedRoomUpdate {
3400 timeline: Timeline {
3401 limited: true,
3402 prev_batch: Some("raclette".to_owned()),
3403 events: vec![f.text_msg("hey yo").into_event()],
3404 },
3405 ..Default::default()
3406 })
3407 .await
3408 .unwrap();
3409
3410 assert_matches!(
3412 generic_stream.recv().await,
3413 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3414 assert_eq!(expected_room_id, room_id);
3415 }
3416 );
3417 assert!(generic_stream.is_empty());
3418
3419 {
3420 let mut state = room_event_cache.inner.state.write().await.unwrap();
3421
3422 let mut num_gaps = 0;
3423 let mut num_events = 0;
3424
3425 for c in state.room_linked_chunk().chunks() {
3426 match c.content() {
3427 ChunkContent::Items(items) => num_events += items.len(),
3428 ChunkContent::Gap(_) => num_gaps += 1,
3429 }
3430 }
3431
3432 assert_eq!(num_gaps, 0);
3435 assert_eq!(num_events, 1);
3436
3437 assert_matches!(
3439 state.load_more_events_backwards().await.unwrap(),
3440 LoadMoreEventsBackwardsOutcome::Gap { .. }
3441 );
3442
3443 num_gaps = 0;
3444 num_events = 0;
3445 for c in state.room_linked_chunk().chunks() {
3446 match c.content() {
3447 ChunkContent::Items(items) => num_events += items.len(),
3448 ChunkContent::Gap(_) => num_gaps += 1,
3449 }
3450 }
3451
3452 assert_eq!(num_gaps, 1);
3454 assert_eq!(num_events, 1);
3455 }
3456
3457 room_event_cache
3460 .inner
3461 .handle_joined_room_update(JoinedRoomUpdate {
3462 timeline: Timeline {
3463 limited: false,
3464 prev_batch: Some("fondue".to_owned()),
3465 events: vec![f.text_msg("sup").into_event()],
3466 },
3467 ..Default::default()
3468 })
3469 .await
3470 .unwrap();
3471
3472 assert_matches!(
3474 generic_stream.recv().await,
3475 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3476 assert_eq!(expected_room_id, room_id);
3477 }
3478 );
3479 assert!(generic_stream.is_empty());
3480
3481 {
3482 let state = room_event_cache.inner.state.read().await.unwrap();
3483
3484 let mut num_gaps = 0;
3485 let mut num_events = 0;
3486
3487 for c in state.room_linked_chunk().chunks() {
3488 match c.content() {
3489 ChunkContent::Items(items) => num_events += items.len(),
3490 ChunkContent::Gap(gap) => {
3491 assert_eq!(gap.prev_token, "raclette");
3492 num_gaps += 1;
3493 }
3494 }
3495 }
3496
3497 assert_eq!(num_gaps, 1);
3499 assert_eq!(num_events, 2);
3500 }
3501 }
3502
3503 #[async_test]
3504 async fn test_shrink_to_last_chunk() {
3505 let room_id = room_id!("!galette:saucisse.bzh");
3506
3507 let client = MockClientBuilder::new(None).build().await;
3508
3509 let f = EventFactory::new().room(room_id);
3510
3511 let evid1 = event_id!("$1");
3512 let evid2 = event_id!("$2");
3513
3514 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3515 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3516
3517 {
3519 client
3520 .event_cache_store()
3521 .lock()
3522 .await
3523 .expect("Could not acquire the event cache lock")
3524 .as_clean()
3525 .expect("Could not acquire a clean event cache lock")
3526 .handle_linked_chunk_updates(
3527 LinkedChunkId::Room(room_id),
3528 vec![
3529 Update::NewItemsChunk {
3530 previous: None,
3531 new: ChunkIdentifier::new(0),
3532 next: None,
3533 },
3534 Update::PushItems {
3535 at: Position::new(ChunkIdentifier::new(0), 0),
3536 items: vec![ev1],
3537 },
3538 Update::NewItemsChunk {
3539 previous: Some(ChunkIdentifier::new(0)),
3540 new: ChunkIdentifier::new(1),
3541 next: None,
3542 },
3543 Update::PushItems {
3544 at: Position::new(ChunkIdentifier::new(1), 0),
3545 items: vec![ev2],
3546 },
3547 ],
3548 )
3549 .await
3550 .unwrap();
3551 }
3552
3553 let event_cache = client.event_cache();
3554 event_cache.subscribe().unwrap();
3555
3556 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3557 let room = client.get_room(room_id).unwrap();
3558 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3559
3560 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
3562 assert_eq!(events.len(), 1);
3563 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3564 assert!(stream.is_empty());
3565
3566 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3567
3568 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3570 assert_eq!(outcome.events.len(), 1);
3571 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3572 assert!(outcome.reached_start);
3573
3574 assert_let_timeout!(
3576 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3577 );
3578 assert_eq!(diffs.len(), 1);
3579 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3580 assert_eq!(value.event_id().as_deref(), Some(evid1));
3581 });
3582
3583 assert!(stream.is_empty());
3584
3585 assert_let_timeout!(
3587 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
3588 );
3589 assert_eq!(expected_room_id, room_id);
3590 assert!(generic_stream.is_empty());
3591
3592 let diffs = room_event_cache
3594 .inner
3595 .state
3596 .write()
3597 .await
3598 .unwrap()
3599 .force_shrink_to_last_chunk()
3600 .await
3601 .expect("shrinking should succeed");
3602
3603 assert_eq!(diffs.len(), 2);
3605 assert_matches!(&diffs[0], VectorDiff::Clear);
3606 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3607 assert_eq!(values.len(), 1);
3608 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3609 });
3610
3611 assert!(stream.is_empty());
3612
3613 assert!(generic_stream.is_empty());
3615
3616 let events = room_event_cache.events().await.unwrap();
3618 assert_eq!(events.len(), 1);
3619 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3620
3621 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3624 assert_eq!(outcome.events.len(), 1);
3625 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3626 assert!(outcome.reached_start);
3627 }
3628
3629 #[async_test]
3630 async fn test_room_ordering() {
3631 let room_id = room_id!("!galette:saucisse.bzh");
3632
3633 let client = MockClientBuilder::new(None).build().await;
3634
3635 let f = EventFactory::new().room(room_id).sender(*ALICE);
3636
3637 let evid1 = event_id!("$1");
3638 let evid2 = event_id!("$2");
3639 let evid3 = event_id!("$3");
3640
3641 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3642 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3643 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3644
3645 {
3647 client
3648 .event_cache_store()
3649 .lock()
3650 .await
3651 .expect("Could not acquire the event cache lock")
3652 .as_clean()
3653 .expect("Could not acquire a clean event cache lock")
3654 .handle_linked_chunk_updates(
3655 LinkedChunkId::Room(room_id),
3656 vec![
3657 Update::NewItemsChunk {
3658 previous: None,
3659 new: ChunkIdentifier::new(0),
3660 next: None,
3661 },
3662 Update::PushItems {
3663 at: Position::new(ChunkIdentifier::new(0), 0),
3664 items: vec![ev1, ev2],
3665 },
3666 Update::NewItemsChunk {
3667 previous: Some(ChunkIdentifier::new(0)),
3668 new: ChunkIdentifier::new(1),
3669 next: None,
3670 },
3671 Update::PushItems {
3672 at: Position::new(ChunkIdentifier::new(1), 0),
3673 items: vec![ev3.clone()],
3674 },
3675 ],
3676 )
3677 .await
3678 .unwrap();
3679 }
3680
3681 let event_cache = client.event_cache();
3682 event_cache.subscribe().unwrap();
3683
3684 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3685 let room = client.get_room(room_id).unwrap();
3686 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3687
3688 {
3691 let state = room_event_cache.inner.state.read().await.unwrap();
3692 let room_linked_chunk = state.room_linked_chunk();
3693
3694 assert_eq!(
3696 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3697 Some(0)
3698 );
3699
3700 assert_eq!(
3702 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3703 Some(1)
3704 );
3705
3706 let mut events = room_linked_chunk.events();
3708 let (pos, ev) = events.next().unwrap();
3709 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3710 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3711 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3712
3713 assert!(events.next().is_none());
3715 }
3716
3717 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3719 assert!(outcome.reached_start);
3720
3721 {
3724 let state = room_event_cache.inner.state.read().await.unwrap();
3725 let room_linked_chunk = state.room_linked_chunk();
3726
3727 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
3728 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
3729 }
3730 }
3731
3732 let evid4 = event_id!("$4");
3737 room_event_cache
3738 .inner
3739 .handle_joined_room_update(JoinedRoomUpdate {
3740 timeline: Timeline {
3741 limited: true,
3742 prev_batch: Some("fondue".to_owned()),
3743 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3744 },
3745 ..Default::default()
3746 })
3747 .await
3748 .unwrap();
3749
3750 {
3751 let state = room_event_cache.inner.state.read().await.unwrap();
3752 let room_linked_chunk = state.room_linked_chunk();
3753
3754 let mut events = room_linked_chunk.events();
3756
3757 let (pos, ev) = events.next().unwrap();
3758 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3759 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3760
3761 let (pos, ev) = events.next().unwrap();
3762 assert_eq!(ev.event_id().as_deref(), Some(evid4));
3763 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
3764
3765 assert!(events.next().is_none());
3767
3768 assert_eq!(
3770 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3771 Some(0)
3772 );
3773 assert_eq!(
3774 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3775 Some(1)
3776 );
3777
3778 assert_eq!(
3781 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
3782 None
3783 );
3784 }
3785 }
3786
3787 #[async_test]
3788 async fn test_auto_shrink_after_all_subscribers_are_gone() {
3789 let room_id = room_id!("!galette:saucisse.bzh");
3790
3791 let client = MockClientBuilder::new(None).build().await;
3792
3793 let f = EventFactory::new().room(room_id);
3794
3795 let evid1 = event_id!("$1");
3796 let evid2 = event_id!("$2");
3797
3798 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3799 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3800
3801 {
3803 client
3804 .event_cache_store()
3805 .lock()
3806 .await
3807 .expect("Could not acquire the event cache lock")
3808 .as_clean()
3809 .expect("Could not acquire a clean event cache lock")
3810 .handle_linked_chunk_updates(
3811 LinkedChunkId::Room(room_id),
3812 vec![
3813 Update::NewItemsChunk {
3814 previous: None,
3815 new: ChunkIdentifier::new(0),
3816 next: None,
3817 },
3818 Update::PushItems {
3819 at: Position::new(ChunkIdentifier::new(0), 0),
3820 items: vec![ev1],
3821 },
3822 Update::NewItemsChunk {
3823 previous: Some(ChunkIdentifier::new(0)),
3824 new: ChunkIdentifier::new(1),
3825 next: None,
3826 },
3827 Update::PushItems {
3828 at: Position::new(ChunkIdentifier::new(1), 0),
3829 items: vec![ev2],
3830 },
3831 ],
3832 )
3833 .await
3834 .unwrap();
3835 }
3836
3837 let event_cache = client.event_cache();
3838 event_cache.subscribe().unwrap();
3839
3840 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3841 let room = client.get_room(room_id).unwrap();
3842 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3843
3844 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
3846 assert_eq!(events1.len(), 1);
3847 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3848 assert!(stream1.is_empty());
3849
3850 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3851
3852 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3854 assert_eq!(outcome.events.len(), 1);
3855 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3856 assert!(outcome.reached_start);
3857
3858 assert_let_timeout!(
3861 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3862 );
3863 assert_eq!(diffs.len(), 1);
3864 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3865 assert_eq!(value.event_id().as_deref(), Some(evid1));
3866 });
3867
3868 assert!(stream1.is_empty());
3869
3870 assert_let_timeout!(
3871 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
3872 );
3873 assert_eq!(expected_room_id, room_id);
3874 assert!(generic_stream.is_empty());
3875
3876 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
3880 assert_eq!(events2.len(), 2);
3881 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3882 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3883 assert!(stream2.is_empty());
3884
3885 drop(stream1);
3887 yield_now().await;
3888
3889 assert!(stream2.is_empty());
3891
3892 drop(stream2);
3894 yield_now().await;
3895
3896 {
3899 let state = room_event_cache.inner.state.read().await.unwrap();
3901 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
3902 }
3903
3904 let events3 = room_event_cache.events().await.unwrap();
3906 assert_eq!(events3.len(), 1);
3907 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3908 }
3909
3910 #[async_test]
3911 async fn test_rfind_map_event_in_memory_by() {
3912 let user_id = user_id!("@mnt_io:matrix.org");
3913 let room_id = room_id!("!raclette:patate.ch");
3914 let client = MockClientBuilder::new(None).build().await;
3915
3916 let event_factory = EventFactory::new().room(room_id);
3917
3918 let event_id_0 = event_id!("$ev0");
3919 let event_id_1 = event_id!("$ev1");
3920 let event_id_2 = event_id!("$ev2");
3921 let event_id_3 = event_id!("$ev3");
3922
3923 let event_0 =
3924 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3925 let event_1 =
3926 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3927 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3928 let event_3 =
3929 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3930
3931 {
3934 client
3935 .event_cache_store()
3936 .lock()
3937 .await
3938 .expect("Could not acquire the event cache lock")
3939 .as_clean()
3940 .expect("Could not acquire a clean event cache lock")
3941 .handle_linked_chunk_updates(
3942 LinkedChunkId::Room(room_id),
3943 vec![
3944 Update::NewItemsChunk {
3945 previous: None,
3946 new: ChunkIdentifier::new(0),
3947 next: None,
3948 },
3949 Update::PushItems {
3950 at: Position::new(ChunkIdentifier::new(0), 0),
3951 items: vec![event_3],
3952 },
3953 Update::NewItemsChunk {
3954 previous: Some(ChunkIdentifier::new(0)),
3955 new: ChunkIdentifier::new(1),
3956 next: None,
3957 },
3958 Update::PushItems {
3959 at: Position::new(ChunkIdentifier::new(1), 0),
3960 items: vec![event_0, event_1, event_2],
3961 },
3962 ],
3963 )
3964 .await
3965 .unwrap();
3966 }
3967
3968 let event_cache = client.event_cache();
3969 event_cache.subscribe().unwrap();
3970
3971 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3972 let room = client.get_room(room_id).unwrap();
3973 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3974
3975 assert_matches!(
3977 room_event_cache
3978 .rfind_map_event_in_memory_by(|event, previous_event| {
3979 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| (event.event_id(), previous_event.and_then(|event| event.event_id())))
3980 })
3981 .await,
3982 Ok(Some((event_id, previous_event_id))) => {
3983 assert_eq!(event_id.as_deref(), Some(event_id_0));
3984 assert!(previous_event_id.is_none());
3985 }
3986 );
3987
3988 assert_matches!(
3991 room_event_cache
3992 .rfind_map_event_in_memory_by(|event, previous_event| {
3993 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| (event.event_id(), previous_event.and_then(|event| event.event_id())))
3994 })
3995 .await,
3996 Ok(Some((event_id, previous_event_id))) => {
3997 assert_eq!(event_id.as_deref(), Some(event_id_2));
3998 assert_eq!(previous_event_id.as_deref(), Some(event_id_1));
3999 }
4000 );
4001
4002 assert!(
4004 room_event_cache
4005 .rfind_map_event_in_memory_by(|event, _| {
4006 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
4007 == Some(user_id))
4008 .then(|| event.event_id())
4009 })
4010 .await
4011 .unwrap()
4012 .is_none()
4013 );
4014
4015 assert!(
4017 room_event_cache
4018 .rfind_map_event_in_memory_by(|_, _| None::<()>)
4019 .await
4020 .unwrap()
4021 .is_none()
4022 );
4023 }
4024
4025 #[async_test]
4026 async fn test_reload_when_dirty() {
4027 let user_id = user_id!("@mnt_io:matrix.org");
4028 let room_id = room_id!("!raclette:patate.ch");
4029
4030 let event_cache_store = MemoryStore::new();
4032
4033 let client_p0 = MockClientBuilder::new(None)
4035 .on_builder(|builder| {
4036 builder.store_config(
4037 StoreConfig::new("process #0".to_owned())
4038 .event_cache_store(event_cache_store.clone()),
4039 )
4040 })
4041 .build()
4042 .await;
4043
4044 let client_p1 = MockClientBuilder::new(None)
4046 .on_builder(|builder| {
4047 builder.store_config(
4048 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4049 )
4050 })
4051 .build()
4052 .await;
4053
4054 let event_factory = EventFactory::new().room(room_id).sender(user_id);
4055
4056 let ev_id_0 = event_id!("$ev_0");
4057 let ev_id_1 = event_id!("$ev_1");
4058
4059 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
4060 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
4061
4062 client_p0
4064 .event_cache_store()
4065 .lock()
4066 .await
4067 .expect("[p0] Could not acquire the event cache lock")
4068 .as_clean()
4069 .expect("[p0] Could not acquire a clean event cache lock")
4070 .handle_linked_chunk_updates(
4071 LinkedChunkId::Room(room_id),
4072 vec![
4073 Update::NewItemsChunk {
4074 previous: None,
4075 new: ChunkIdentifier::new(0),
4076 next: None,
4077 },
4078 Update::PushItems {
4079 at: Position::new(ChunkIdentifier::new(0), 0),
4080 items: vec![ev_0],
4081 },
4082 Update::NewItemsChunk {
4083 previous: Some(ChunkIdentifier::new(0)),
4084 new: ChunkIdentifier::new(1),
4085 next: None,
4086 },
4087 Update::PushItems {
4088 at: Position::new(ChunkIdentifier::new(1), 0),
4089 items: vec![ev_1],
4090 },
4091 ],
4092 )
4093 .await
4094 .unwrap();
4095
4096 let (room_event_cache_p0, room_event_cache_p1) = {
4098 let event_cache_p0 = client_p0.event_cache();
4099 event_cache_p0.subscribe().unwrap();
4100
4101 let event_cache_p1 = client_p1.event_cache();
4102 event_cache_p1.subscribe().unwrap();
4103
4104 client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
4105 client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
4106
4107 let (room_event_cache_p0, _drop_handles) =
4108 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
4109 let (room_event_cache_p1, _drop_handles) =
4110 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
4111
4112 (room_event_cache_p0, room_event_cache_p1)
4113 };
4114
4115 let mut updates_stream_p0 = {
4120 let room_event_cache = &room_event_cache_p0;
4121
4122 let (initial_updates, mut updates_stream) =
4123 room_event_cache_p0.subscribe().await.unwrap();
4124
4125 assert_eq!(initial_updates.len(), 1);
4127 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4128 assert!(updates_stream.is_empty());
4129
4130 assert!(event_loaded(room_event_cache, ev_id_1).await);
4132
4133 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4135
4136 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4138
4139 assert_matches!(
4141 updates_stream.recv().await.unwrap(),
4142 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4143 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4144 assert_matches!(
4145 &diffs[0],
4146 VectorDiff::Insert { index: 0, value: event } => {
4147 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4148 }
4149 );
4150 }
4151 );
4152
4153 assert!(event_loaded(room_event_cache, ev_id_0).await);
4155
4156 updates_stream
4157 };
4158
4159 let mut updates_stream_p1 = {
4161 let room_event_cache = &room_event_cache_p1;
4162 let (initial_updates, mut updates_stream) =
4163 room_event_cache_p1.subscribe().await.unwrap();
4164
4165 assert_eq!(initial_updates.len(), 1);
4167 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4168 assert!(updates_stream.is_empty());
4169
4170 assert!(event_loaded(room_event_cache, ev_id_1).await);
4172
4173 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4175
4176 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4178
4179 assert_matches!(
4181 updates_stream.recv().await.unwrap(),
4182 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4183 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4184 assert_matches!(
4185 &diffs[0],
4186 VectorDiff::Insert { index: 0, value: event } => {
4187 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4188 }
4189 );
4190 }
4191 );
4192
4193 assert!(event_loaded(room_event_cache, ev_id_0).await);
4195
4196 updates_stream
4197 };
4198
4199 for _ in 0..3 {
4201 {
4205 let room_event_cache = &room_event_cache_p0;
4206 let updates_stream = &mut updates_stream_p0;
4207
4208 assert!(event_loaded(room_event_cache, ev_id_1).await);
4210
4211 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4214
4215 assert_matches!(
4217 updates_stream.recv().await.unwrap(),
4218 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4219 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4220 assert_matches!(&diffs[0], VectorDiff::Clear);
4221 assert_matches!(
4222 &diffs[1],
4223 VectorDiff::Append { values: events } => {
4224 assert_eq!(events.len(), 1);
4225 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4226 }
4227 );
4228 }
4229 );
4230
4231 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4233
4234 assert!(event_loaded(room_event_cache, ev_id_0).await);
4236
4237 assert_matches!(
4239 updates_stream.recv().await.unwrap(),
4240 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4241 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4242 assert_matches!(
4243 &diffs[0],
4244 VectorDiff::Insert { index: 0, value: event } => {
4245 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4246 }
4247 );
4248 }
4249 );
4250 }
4251
4252 {
4256 let room_event_cache = &room_event_cache_p1;
4257 let updates_stream = &mut updates_stream_p1;
4258
4259 assert!(event_loaded(room_event_cache, ev_id_1).await);
4261
4262 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4265
4266 assert_matches!(
4268 updates_stream.recv().await.unwrap(),
4269 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4270 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4271 assert_matches!(&diffs[0], VectorDiff::Clear);
4272 assert_matches!(
4273 &diffs[1],
4274 VectorDiff::Append { values: events } => {
4275 assert_eq!(events.len(), 1);
4276 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4277 }
4278 );
4279 }
4280 );
4281
4282 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4284
4285 assert!(event_loaded(room_event_cache, ev_id_0).await);
4287
4288 assert_matches!(
4290 updates_stream.recv().await.unwrap(),
4291 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4292 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4293 assert_matches!(
4294 &diffs[0],
4295 VectorDiff::Insert { index: 0, value: event } => {
4296 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4297 }
4298 );
4299 }
4300 );
4301 }
4302 }
4303
4304 for _ in 0..3 {
4307 {
4308 let room_event_cache = &room_event_cache_p0;
4309 let updates_stream = &mut updates_stream_p0;
4310
4311 let guard = room_event_cache.inner.state.read().await.unwrap();
4312
4313 assert!(guard.is_dirty().not());
4319
4320 assert_matches!(
4322 updates_stream.recv().await.unwrap(),
4323 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4324 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4325 assert_matches!(&diffs[0], VectorDiff::Clear);
4326 assert_matches!(
4327 &diffs[1],
4328 VectorDiff::Append { values: events } => {
4329 assert_eq!(events.len(), 1);
4330 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4331 }
4332 );
4333 }
4334 );
4335
4336 assert!(event_loaded(room_event_cache, ev_id_1).await);
4337 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4338
4339 drop(guard);
4345
4346 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4347 assert!(event_loaded(room_event_cache, ev_id_0).await);
4348
4349 assert_matches!(
4351 updates_stream.recv().await.unwrap(),
4352 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4353 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4354 assert_matches!(
4355 &diffs[0],
4356 VectorDiff::Insert { index: 0, value: event } => {
4357 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4358 }
4359 );
4360 }
4361 );
4362 }
4363
4364 {
4365 let room_event_cache = &room_event_cache_p1;
4366 let updates_stream = &mut updates_stream_p1;
4367
4368 let guard = room_event_cache.inner.state.read().await.unwrap();
4369
4370 assert!(guard.is_dirty().not());
4375
4376 assert_matches!(
4378 updates_stream.recv().await.unwrap(),
4379 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4380 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4381 assert_matches!(&diffs[0], VectorDiff::Clear);
4382 assert_matches!(
4383 &diffs[1],
4384 VectorDiff::Append { values: events } => {
4385 assert_eq!(events.len(), 1);
4386 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4387 }
4388 );
4389 }
4390 );
4391
4392 assert!(event_loaded(room_event_cache, ev_id_1).await);
4393 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4394
4395 drop(guard);
4401
4402 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4403 assert!(event_loaded(room_event_cache, ev_id_0).await);
4404
4405 assert_matches!(
4407 updates_stream.recv().await.unwrap(),
4408 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4409 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4410 assert_matches!(
4411 &diffs[0],
4412 VectorDiff::Insert { index: 0, value: event } => {
4413 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4414 }
4415 );
4416 }
4417 );
4418 }
4419 }
4420
4421 for _ in 0..3 {
4423 {
4424 let room_event_cache = &room_event_cache_p0;
4425 let updates_stream = &mut updates_stream_p0;
4426
4427 let guard = room_event_cache.inner.state.write().await.unwrap();
4428
4429 assert!(guard.is_dirty().not());
4431
4432 assert_matches!(
4434 updates_stream.recv().await.unwrap(),
4435 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4436 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4437 assert_matches!(&diffs[0], VectorDiff::Clear);
4438 assert_matches!(
4439 &diffs[1],
4440 VectorDiff::Append { values: events } => {
4441 assert_eq!(events.len(), 1);
4442 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4443 }
4444 );
4445 }
4446 );
4447
4448 drop(guard);
4451
4452 assert!(event_loaded(room_event_cache, ev_id_1).await);
4453 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4454
4455 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4456 assert!(event_loaded(room_event_cache, ev_id_0).await);
4457
4458 assert_matches!(
4460 updates_stream.recv().await.unwrap(),
4461 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4462 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4463 assert_matches!(
4464 &diffs[0],
4465 VectorDiff::Insert { index: 0, value: event } => {
4466 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4467 }
4468 );
4469 }
4470 );
4471 }
4472
4473 {
4474 let room_event_cache = &room_event_cache_p1;
4475 let updates_stream = &mut updates_stream_p1;
4476
4477 let guard = room_event_cache.inner.state.write().await.unwrap();
4478
4479 assert!(guard.is_dirty().not());
4481
4482 assert_matches!(
4484 updates_stream.recv().await.unwrap(),
4485 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4486 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4487 assert_matches!(&diffs[0], VectorDiff::Clear);
4488 assert_matches!(
4489 &diffs[1],
4490 VectorDiff::Append { values: events } => {
4491 assert_eq!(events.len(), 1);
4492 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4493 }
4494 );
4495 }
4496 );
4497
4498 drop(guard);
4501
4502 assert!(event_loaded(room_event_cache, ev_id_1).await);
4503 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4504
4505 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4506 assert!(event_loaded(room_event_cache, ev_id_0).await);
4507
4508 assert_matches!(
4510 updates_stream.recv().await.unwrap(),
4511 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4512 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4513 assert_matches!(
4514 &diffs[0],
4515 VectorDiff::Insert { index: 0, value: event } => {
4516 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4517 }
4518 );
4519 }
4520 );
4521 }
4522 }
4523 }
4524
4525 #[async_test]
4526 async fn test_load_when_dirty() {
4527 let room_id_0 = room_id!("!raclette:patate.ch");
4528 let room_id_1 = room_id!("!morbiflette:patate.ch");
4529
4530 let event_cache_store = MemoryStore::new();
4532
4533 let client_p0 = MockClientBuilder::new(None)
4535 .on_builder(|builder| {
4536 builder.store_config(
4537 StoreConfig::new("process #0".to_owned())
4538 .event_cache_store(event_cache_store.clone()),
4539 )
4540 })
4541 .build()
4542 .await;
4543
4544 let client_p1 = MockClientBuilder::new(None)
4546 .on_builder(|builder| {
4547 builder.store_config(
4548 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4549 )
4550 })
4551 .build()
4552 .await;
4553
4554 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
4556 let event_cache_p0 = client_p0.event_cache();
4557 event_cache_p0.subscribe().unwrap();
4558
4559 let event_cache_p1 = client_p1.event_cache();
4560 event_cache_p1.subscribe().unwrap();
4561
4562 client_p0
4563 .base_client()
4564 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4565 client_p0
4566 .base_client()
4567 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4568
4569 client_p1
4570 .base_client()
4571 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4572 client_p1
4573 .base_client()
4574 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4575
4576 let (room_event_cache_0_p0, _drop_handles) =
4577 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4578 let (room_event_cache_0_p1, _drop_handles) =
4579 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4580
4581 (room_event_cache_0_p0, room_event_cache_0_p1)
4582 };
4583
4584 {
4586 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
4587 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
4588 }
4589
4590 let (room_event_cache_1_p0, _) =
4594 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
4595
4596 {
4598 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
4599 assert!(guard.is_dirty().not());
4600 }
4601
4602 }
4605
4606 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
4607 room_event_cache
4608 .rfind_map_event_in_memory_by(|event, _previous_event_id| {
4609 (event.event_id().as_deref() == Some(event_id)).then_some(())
4610 })
4611 .await
4612 .unwrap()
4613 .is_some()
4614 }
4615}