1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 Arc,
23 atomic::{AtomicUsize, Ordering},
24 },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::AmbiguityChange,
32 event_cache::Event,
33 linked_chunk::Position,
34 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37 EventId, OwnedEventId, OwnedRoomId,
38 api::Direction,
39 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40 serde::Raw,
41};
42use tokio::sync::{
43 Notify,
44 broadcast::{Receiver, Sender},
45 mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50 AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51 RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54 client::WeakClient,
55 event_cache::EventCacheError,
56 room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64#[derive(Clone)]
68pub struct RoomEventCache {
69 pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("RoomEventCache").finish_non_exhaustive()
75 }
76}
77
78#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89 recv: Receiver<RoomEventCacheUpdate>,
91
92 room_id: OwnedRoomId,
94
95 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98 subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103 fn drop(&mut self) {
104 let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106 trace!(
107 "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108 );
109
110 if previous_subscriber_count == 1 {
111 let mut room_id = self.room_id.clone();
115
116 let mut num_attempts = 0;
122
123 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124 num_attempts += 1;
125
126 if num_attempts > 1024 {
127 warn!(
130 "couldn't send notification to the auto-shrink channel \
131 after 1024 attempts; giving up"
132 );
133 return;
134 }
135
136 match err {
137 mpsc::error::TrySendError::Full(stolen_room_id) => {
138 room_id = stolen_room_id;
139 }
140 mpsc::error::TrySendError::Closed(_) => return,
141 }
142 }
143
144 trace!("sent notification to the parent channel that we were the last subscriber");
145 }
146 }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150 type Target = Receiver<RoomEventCacheUpdate>;
151
152 fn deref(&self) -> &Self::Target {
153 &self.recv
154 }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158 fn deref_mut(&mut self) -> &mut Self::Target {
159 &mut self.recv
160 }
161}
162
163impl RoomEventCache {
164 pub(super) fn new(
166 client: WeakClient,
167 state: RoomEventCacheStateLock,
168 pagination_status: SharedObservable<RoomPaginationStatus>,
169 room_id: OwnedRoomId,
170 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171 update_sender: Sender<RoomEventCacheUpdate>,
172 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
173 ) -> Self {
174 Self {
175 inner: Arc::new(RoomEventCacheInner::new(
176 client,
177 state,
178 pagination_status,
179 room_id,
180 auto_shrink_sender,
181 update_sender,
182 generic_update_sender,
183 )),
184 }
185 }
186
187 pub async fn events(&self) -> Result<Vec<Event>> {
192 let state = self.inner.state.read().await?;
193
194 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
195 }
196
197 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
204 let state = self.inner.state.read().await?;
205 let events =
206 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
207
208 let subscriber_count = state.subscriber_count();
209 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
210 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
211
212 let recv = self.inner.update_sender.subscribe();
213 let subscriber = RoomEventCacheSubscriber {
214 recv,
215 room_id: self.inner.room_id.clone(),
216 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
217 subscriber_count: subscriber_count.clone(),
218 };
219
220 Ok((events, subscriber))
221 }
222
223 pub async fn subscribe_to_thread(
226 &self,
227 thread_root: OwnedEventId,
228 ) -> Result<(Vec<Event>, Receiver<ThreadEventCacheUpdate>)> {
229 let mut state = self.inner.state.write().await?;
230 Ok(state.subscribe_to_thread(thread_root))
231 }
232
233 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
238 pub async fn paginate_thread_backwards(
239 &self,
240 thread_root: OwnedEventId,
241 num_events: u16,
242 ) -> Result<bool> {
243 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
244
245 let mut outcome =
247 self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone());
248
249 loop {
250 match outcome {
251 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
252 let options = RelationsOptions {
254 from: prev_token.clone(),
255 dir: Direction::Backward,
256 limit: Some(num_events.into()),
257 include_relations: IncludeRelations::AllRelations,
258 recurse: true,
259 };
260
261 let mut result = room
262 .relations(thread_root.clone(), options)
263 .await
264 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
265
266 let reached_start = result.next_batch_token.is_none();
267 trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
268
269 let root_event =
272 if reached_start {
273 Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
275 |err| EventCacheError::BackpaginationError(Box::new(err)),
276 )?)
277 } else {
278 None
279 };
280
281 let mut state = self.inner.state.write().await?;
282
283 state.save_events(result.chunk.iter().cloned()).await?;
285
286 result.chunk.extend(root_event);
289
290 if let Some(outcome) = state.finish_thread_network_pagination(
291 thread_root.clone(),
292 prev_token,
293 result.next_batch_token,
294 result.chunk,
295 ) {
296 return Ok(outcome.reached_start);
297 }
298
299 outcome = state.load_more_thread_events_backwards(thread_root.clone());
301 }
302
303 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
304 return Ok(true);
306 }
307
308 LoadMoreEventsBackwardsOutcome::Events { .. } => {
309 unimplemented!("loading from disk for threads is not implemented yet");
311 }
312 }
313 }
314 }
315
316 pub fn pagination(&self) -> RoomPagination {
319 RoomPagination { inner: self.inner.clone() }
320 }
321
322 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
328 where
329 P: FnMut(&Event) -> Option<O>,
330 {
331 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
332 }
333
334 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
339 Ok(self
340 .inner
341 .state
342 .read()
343 .await?
344 .find_event(event_id)
345 .await
346 .ok()
347 .flatten()
348 .map(|(_loc, event)| event))
349 }
350
351 pub async fn find_event_with_relations(
363 &self,
364 event_id: &EventId,
365 filter: Option<Vec<RelationType>>,
366 ) -> Result<Option<(Event, Vec<Event>)>> {
367 Ok(self
369 .inner
370 .state
371 .read()
372 .await?
373 .find_event_with_relations(event_id, filter.clone())
374 .await
375 .ok()
376 .flatten())
377 }
378
379 pub async fn clear(&self) -> Result<()> {
384 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
386
387 let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
389 diffs: updates_as_vector_diffs,
390 origin: EventsOrigin::Cache,
391 });
392
393 let _ = self
395 .inner
396 .generic_update_sender
397 .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
398
399 Ok(())
400 }
401
402 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
405 match self.inner.state.write().await {
406 Ok(mut state_guard) => {
407 if let Err(err) = state_guard.save_events(events).await {
408 warn!("couldn't save event in the event cache: {err}");
409 }
410 }
411
412 Err(err) => {
413 warn!("couldn't save event in the event cache: {err}");
414 }
415 }
416 }
417
418 pub async fn debug_string(&self) -> Vec<String> {
421 match self.inner.state.read().await {
422 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
423 Err(err) => {
424 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
425
426 vec![]
427 }
428 }
429 }
430}
431
432pub(super) struct RoomEventCacheInner {
434 pub(super) room_id: OwnedRoomId,
436
437 pub weak_room: WeakRoom,
438
439 pub state: RoomEventCacheStateLock,
441
442 pub pagination_batch_token_notifier: Notify,
444
445 pub pagination_status: SharedObservable<RoomPaginationStatus>,
446
447 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
452
453 pub update_sender: Sender<RoomEventCacheUpdate>,
455
456 pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
462}
463
464impl RoomEventCacheInner {
465 fn new(
468 client: WeakClient,
469 state: RoomEventCacheStateLock,
470 pagination_status: SharedObservable<RoomPaginationStatus>,
471 room_id: OwnedRoomId,
472 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
473 update_sender: Sender<RoomEventCacheUpdate>,
474 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
475 ) -> Self {
476 let weak_room = WeakRoom::new(client, room_id);
477
478 Self {
479 room_id: weak_room.room_id().to_owned(),
480 weak_room,
481 state,
482 update_sender,
483 pagination_batch_token_notifier: Default::default(),
484 auto_shrink_sender,
485 pagination_status,
486 generic_update_sender,
487 }
488 }
489
490 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
491 if account_data.is_empty() {
492 return;
493 }
494
495 let mut handled_read_marker = false;
496
497 trace!("Handling account data");
498
499 for raw_event in account_data {
500 match raw_event.deserialize() {
501 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
502 if handled_read_marker {
505 continue;
506 }
507
508 handled_read_marker = true;
509
510 let _ = self.update_sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
512 event_id: ev.content.event_id,
513 });
514 }
515
516 Ok(_) => {
517 }
520
521 Err(e) => {
522 let event_type = raw_event.get_field::<String>("type").ok().flatten();
523 warn!(event_type, "Failed to deserialize account data: {e}");
524 }
525 }
526 }
527 }
528
529 #[instrument(skip_all, fields(room_id = %self.room_id))]
530 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
531 self.handle_timeline(
532 updates.timeline,
533 updates.ephemeral.clone(),
534 updates.ambiguity_changes,
535 )
536 .await?;
537 self.handle_account_data(updates.account_data);
538
539 Ok(())
540 }
541
542 #[instrument(skip_all, fields(room_id = %self.room_id))]
543 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
544 self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
545
546 Ok(())
547 }
548
549 async fn handle_timeline(
552 &self,
553 timeline: Timeline,
554 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
555 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
556 ) -> Result<()> {
557 if timeline.events.is_empty()
558 && timeline.prev_batch.is_none()
559 && ephemeral_events.is_empty()
560 && ambiguity_changes.is_empty()
561 {
562 return Ok(());
563 }
564
565 trace!("adding new events");
567
568 let (stored_prev_batch_token, timeline_event_diffs) =
569 self.state.write().await?.handle_sync(timeline).await?;
570
571 if stored_prev_batch_token {
574 self.pagination_batch_token_notifier.notify_one();
575 }
576
577 if !timeline_event_diffs.is_empty() {
580 let _ = self.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
581 diffs: timeline_event_diffs,
582 origin: EventsOrigin::Sync,
583 });
584
585 let _ = self
586 .generic_update_sender
587 .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
588 }
589
590 if !ephemeral_events.is_empty() {
591 let _ = self
592 .update_sender
593 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
594 }
595
596 if !ambiguity_changes.is_empty() {
597 let _ =
598 self.update_sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
599 }
600
601 Ok(())
602 }
603}
604
605#[derive(Debug)]
608pub(super) enum LoadMoreEventsBackwardsOutcome {
609 Gap {
611 prev_token: Option<String>,
614 },
615
616 StartOfTimeline,
618
619 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
621}
622
623mod private {
625 use std::{
626 collections::{BTreeMap, HashMap, HashSet},
627 sync::{
628 Arc,
629 atomic::{AtomicBool, AtomicUsize, Ordering},
630 },
631 };
632
633 use eyeball::SharedObservable;
634 use eyeball_im::VectorDiff;
635 use matrix_sdk_base::{
636 apply_redaction,
637 deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
638 event_cache::{
639 Event, Gap,
640 store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
641 },
642 linked_chunk::{
643 ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
644 OwnedLinkedChunkId, Position, Update, lazy_loader,
645 },
646 serde_helpers::{extract_edit_target, extract_thread_root},
647 sync::Timeline,
648 };
649 use matrix_sdk_common::executor::spawn;
650 use ruma::{
651 EventId, OwnedEventId, OwnedRoomId, RoomId,
652 events::{
653 AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
654 relation::RelationType, room::redaction::SyncRoomRedactionEvent,
655 },
656 room_version_rules::RoomVersionRules,
657 serde::Raw,
658 };
659 use tokio::sync::{
660 Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
661 broadcast::{Receiver, Sender},
662 };
663 use tracing::{debug, error, instrument, trace, warn};
664
665 use super::{
666 super::{
667 BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate,
668 RoomPaginationStatus, ThreadEventCacheUpdate,
669 deduplicator::{DeduplicationOutcome, filter_duplicate_events},
670 room::threads::ThreadEventCache,
671 },
672 EventLocation, EventsOrigin, LoadMoreEventsBackwardsOutcome, RoomEventCacheGenericUpdate,
673 RoomEventCacheUpdate,
674 events::EventLinkedChunk,
675 sort_positions_descending,
676 };
677
678 pub struct RoomEventCacheStateLock {
683 locked_state: RwLock<RoomEventCacheStateLockInner>,
685
686 read_lock_acquisition: Mutex<()>,
689 }
690
691 struct RoomEventCacheStateLockInner {
692 enabled_thread_support: bool,
694
695 room_id: OwnedRoomId,
697
698 store: EventCacheStoreLock,
700
701 room_linked_chunk: EventLinkedChunk,
704
705 threads: HashMap<OwnedEventId, ThreadEventCache>,
709
710 pagination_status: SharedObservable<RoomPaginationStatus>,
711
712 update_sender: Sender<RoomEventCacheUpdate>,
717
718 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
723
724 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
727
728 room_version_rules: RoomVersionRules,
730
731 waited_for_initial_prev_token: Arc<AtomicBool>,
736
737 subscriber_count: Arc<AtomicUsize>,
740 }
741
742 impl RoomEventCacheStateLock {
743 #[allow(clippy::too_many_arguments)]
754 pub async fn new(
755 room_id: OwnedRoomId,
756 room_version_rules: RoomVersionRules,
757 enabled_thread_support: bool,
758 update_sender: Sender<RoomEventCacheUpdate>,
759 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
760 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
761 store: EventCacheStoreLock,
762 pagination_status: SharedObservable<RoomPaginationStatus>,
763 ) -> Result<Self, EventCacheError> {
764 let store_guard = match store.lock().await? {
765 EventCacheStoreLockState::Clean(guard) => guard,
767
768 EventCacheStoreLockState::Dirty(guard) => {
771 EventCacheStoreLockGuard::clear_dirty(&guard);
772
773 guard
774 }
775 };
776
777 let linked_chunk_id = LinkedChunkId::Room(&room_id);
778
779 let full_linked_chunk_metadata =
784 match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
785 Ok(metas) => metas,
786 Err(err) => {
787 error!(
788 "error when loading a linked chunk's metadata from the store: {err}"
789 );
790
791 store_guard
793 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
794 .await?;
795
796 None
798 }
799 };
800
801 let linked_chunk = match store_guard
802 .load_last_chunk(linked_chunk_id)
803 .await
804 .map_err(EventCacheError::from)
805 .and_then(|(last_chunk, chunk_identifier_generator)| {
806 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
807 .map_err(EventCacheError::from)
808 }) {
809 Ok(linked_chunk) => linked_chunk,
810 Err(err) => {
811 error!(
812 "error when loading a linked chunk's latest chunk from the store: {err}"
813 );
814
815 store_guard
817 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
818 .await?;
819
820 None
821 }
822 };
823
824 let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false));
825
826 Ok(Self {
827 locked_state: RwLock::new(RoomEventCacheStateLockInner {
828 enabled_thread_support,
829 room_id,
830 store,
831 room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
832 linked_chunk,
833 full_linked_chunk_metadata,
834 ),
835 threads: HashMap::new(),
840 pagination_status,
841 update_sender,
842 generic_update_sender,
843 linked_chunk_update_sender,
844 room_version_rules,
845 waited_for_initial_prev_token,
846 subscriber_count: Default::default(),
847 }),
848 read_lock_acquisition: Mutex::new(()),
849 })
850 }
851
852 pub async fn read(&self) -> Result<RoomEventCacheStateLockReadGuard<'_>, EventCacheError> {
862 let _one_reader_guard = self.read_lock_acquisition.lock().await;
909
910 let state_guard = self.locked_state.read().await;
912
913 match state_guard.store.lock().await? {
914 EventCacheStoreLockState::Clean(store_guard) => {
915 Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard })
916 }
917 EventCacheStoreLockState::Dirty(store_guard) => {
918 drop(state_guard);
922 let state_guard = self.locked_state.write().await;
923
924 let mut guard = RoomEventCacheStateLockWriteGuard {
925 state: state_guard,
926 store: store_guard,
927 };
928
929 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
931
932 EventCacheStoreLockGuard::clear_dirty(&guard.store);
934
935 let guard = guard.downgrade();
937
938 if !updates_as_vector_diffs.is_empty() {
940 let _ = guard.state.update_sender.send(
942 RoomEventCacheUpdate::UpdateTimelineEvents {
943 diffs: updates_as_vector_diffs,
944 origin: EventsOrigin::Cache,
945 },
946 );
947
948 let _ =
950 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
951 room_id: guard.state.room_id.clone(),
952 });
953 }
954
955 Ok(guard)
956 }
957 }
958 }
959
960 pub async fn write(
971 &self,
972 ) -> Result<RoomEventCacheStateLockWriteGuard<'_>, EventCacheError> {
973 let state_guard = self.locked_state.write().await;
974
975 match state_guard.store.lock().await? {
976 EventCacheStoreLockState::Clean(store_guard) => {
977 Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard })
978 }
979 EventCacheStoreLockState::Dirty(store_guard) => {
980 let mut guard = RoomEventCacheStateLockWriteGuard {
981 state: state_guard,
982 store: store_guard,
983 };
984
985 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
987
988 EventCacheStoreLockGuard::clear_dirty(&guard.store);
990
991 if !updates_as_vector_diffs.is_empty() {
993 let _ = guard.state.update_sender.send(
995 RoomEventCacheUpdate::UpdateTimelineEvents {
996 diffs: updates_as_vector_diffs,
997 origin: EventsOrigin::Cache,
998 },
999 );
1000
1001 let _ =
1003 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1004 room_id: guard.state.room_id.clone(),
1005 });
1006 }
1007
1008 Ok(guard)
1009 }
1010 }
1011 }
1012 }
1013
1014 pub struct RoomEventCacheStateLockReadGuard<'a> {
1016 state: RwLockReadGuard<'a, RoomEventCacheStateLockInner>,
1019
1020 store: EventCacheStoreLockGuard,
1022 }
1023
1024 pub struct RoomEventCacheStateLockWriteGuard<'a> {
1026 state: RwLockWriteGuard<'a, RoomEventCacheStateLockInner>,
1029
1030 store: EventCacheStoreLockGuard,
1032 }
1033
1034 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1035 fn downgrade(self) -> RoomEventCacheStateLockReadGuard<'a> {
1043 RoomEventCacheStateLockReadGuard { state: self.state.downgrade(), store: self.store }
1044 }
1045 }
1046
1047 impl<'a> RoomEventCacheStateLockReadGuard<'a> {
1048 pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1050 &self.state.room_linked_chunk
1051 }
1052
1053 pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
1054 &self.state.subscriber_count
1055 }
1056
1057 pub async fn find_event(
1062 &self,
1063 event_id: &EventId,
1064 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1065 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1066 .await
1067 }
1068
1069 pub async fn find_event_with_relations(
1083 &self,
1084 event_id: &EventId,
1085 filters: Option<Vec<RelationType>>,
1086 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1087 find_event_with_relations(
1088 event_id,
1089 &self.state.room_id,
1090 filters,
1091 &self.state.room_linked_chunk,
1092 &self.store,
1093 )
1094 .await
1095 }
1096
1097 pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1103 where
1104 P: FnMut(&Event) -> Option<O>,
1105 {
1106 self.state.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event))
1107 }
1108
1109 #[cfg(test)]
1110 pub fn is_dirty(&self) -> bool {
1111 EventCacheStoreLockGuard::is_dirty(&self.store)
1112 }
1113 }
1114
1115 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1116 #[cfg(any(feature = "e2e-encryption", test))]
1118 pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk {
1119 &mut self.state.room_linked_chunk
1120 }
1121
1122 pub fn waited_for_initial_prev_token(&self) -> &Arc<AtomicBool> {
1124 &self.state.waited_for_initial_prev_token
1125 }
1126
1127 pub async fn find_event(
1132 &self,
1133 event_id: &EventId,
1134 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1135 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1136 .await
1137 }
1138
1139 pub async fn find_event_with_relations(
1153 &self,
1154 event_id: &EventId,
1155 filters: Option<Vec<RelationType>>,
1156 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1157 find_event_with_relations(
1158 event_id,
1159 &self.state.room_id,
1160 filters,
1161 &self.state.room_linked_chunk,
1162 &self.store,
1163 )
1164 .await
1165 }
1166
1167 pub async fn load_more_events_backwards(
1169 &mut self,
1170 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
1171 if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token)
1174 {
1175 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
1176 }
1177
1178 let prev_first_chunk = self
1179 .state
1180 .room_linked_chunk
1181 .chunks()
1182 .next()
1183 .expect("a linked chunk is never empty");
1184
1185 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1187 let new_first_chunk = match self
1188 .store
1189 .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
1190 .await
1191 {
1192 Ok(Some(new_first_chunk)) => {
1193 new_first_chunk
1195 }
1196
1197 Ok(None) => {
1198 if self.state.room_linked_chunk.events().next().is_some() {
1203 trace!("chunk is fully loaded and non-empty: reached_start=true");
1206 return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
1207 }
1208
1209 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
1211 }
1212
1213 Err(err) => {
1214 error!("error when loading the previous chunk of a linked chunk: {err}");
1215
1216 self.store
1218 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1219 .await?;
1220
1221 return Err(err.into());
1223 }
1224 };
1225
1226 let chunk_content = new_first_chunk.content.clone();
1227
1228 let reached_start = new_first_chunk.previous.is_none();
1234
1235 if let Err(err) =
1236 self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk)
1237 {
1238 error!("error when inserting the previous chunk into its linked chunk: {err}");
1239
1240 self.store
1242 .handle_linked_chunk_updates(
1243 LinkedChunkId::Room(&self.state.room_id),
1244 vec![Update::Clear],
1245 )
1246 .await?;
1247
1248 return Err(err.into());
1250 }
1251
1252 let _ = self.state.room_linked_chunk.store_updates().take();
1255
1256 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1258
1259 Ok(match chunk_content {
1260 ChunkContent::Gap(gap) => {
1261 trace!("reloaded chunk from disk (gap)");
1262 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1263 }
1264
1265 ChunkContent::Items(events) => {
1266 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1267 LoadMoreEventsBackwardsOutcome::Events {
1268 events,
1269 timeline_event_diffs,
1270 reached_start,
1271 }
1272 }
1273 })
1274 }
1275
1276 pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1285 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1287 let (last_chunk, chunk_identifier_generator) =
1288 match self.store.load_last_chunk(linked_chunk_id).await {
1289 Ok(pair) => pair,
1290
1291 Err(err) => {
1292 error!("error when reloading a linked chunk from memory: {err}");
1294
1295 self.store
1297 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1298 .await?;
1299
1300 (None, ChunkIdentifierGenerator::new_from_scratch())
1302 }
1303 };
1304
1305 debug!("unloading the linked chunk, and resetting it to its last chunk");
1306
1307 if let Err(err) =
1310 self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1311 {
1312 error!("error when replacing the linked chunk: {err}");
1313 return self.reset_internal().await;
1314 }
1315
1316 self.state
1320 .pagination_status
1321 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1322
1323 let _ = self.state.room_linked_chunk.store_updates().take();
1326
1327 Ok(())
1328 }
1329
1330 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1333 pub async fn auto_shrink_if_no_subscribers(
1334 &mut self,
1335 ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1336 let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
1337
1338 trace!(subscriber_count, "received request to auto-shrink");
1339
1340 if subscriber_count == 0 {
1341 self.shrink_to_last_chunk().await?;
1344
1345 Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
1346 } else {
1347 Ok(None)
1348 }
1349 }
1350
1351 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1353 pub async fn force_shrink_to_last_chunk(
1354 &mut self,
1355 ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1356 self.shrink_to_last_chunk().await?;
1357
1358 Ok(self.state.room_linked_chunk.updates_as_vector_diffs())
1359 }
1360
1361 #[instrument(skip_all)]
1367 pub async fn remove_events(
1368 &mut self,
1369 in_memory_events: Vec<(OwnedEventId, Position)>,
1370 in_store_events: Vec<(OwnedEventId, Position)>,
1371 ) -> Result<(), EventCacheError> {
1372 if !in_store_events.is_empty() {
1374 let mut positions = in_store_events
1375 .into_iter()
1376 .map(|(_event_id, position)| position)
1377 .collect::<Vec<_>>();
1378
1379 sort_positions_descending(&mut positions);
1380
1381 let updates = positions
1382 .into_iter()
1383 .map(|pos| Update::RemoveItem { at: pos })
1384 .collect::<Vec<_>>();
1385
1386 self.apply_store_only_updates(updates).await?;
1387 }
1388
1389 if in_memory_events.is_empty() {
1391 return Ok(());
1393 }
1394
1395 self.state
1397 .room_linked_chunk
1398 .remove_events_by_position(
1399 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1400 )
1401 .expect("failed to remove an event");
1402
1403 self.propagate_changes().await
1404 }
1405
1406 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1407 let updates = self.state.room_linked_chunk.store_updates().take();
1408 self.send_updates_to_store(updates).await
1409 }
1410
1411 async fn apply_store_only_updates(
1418 &mut self,
1419 updates: Vec<Update<Event, Gap>>,
1420 ) -> Result<(), EventCacheError> {
1421 self.state.room_linked_chunk.order_tracker.map_updates(&updates);
1422 self.send_updates_to_store(updates).await
1423 }
1424
1425 async fn send_updates_to_store(
1426 &mut self,
1427 mut updates: Vec<Update<Event, Gap>>,
1428 ) -> Result<(), EventCacheError> {
1429 if updates.is_empty() {
1430 return Ok(());
1431 }
1432
1433 for update in updates.iter_mut() {
1435 match update {
1436 Update::PushItems { items, .. } => strip_relations_from_events(items),
1437 Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
1438 Update::NewItemsChunk { .. }
1440 | Update::NewGapChunk { .. }
1441 | Update::RemoveChunk(_)
1442 | Update::RemoveItem { .. }
1443 | Update::DetachLastItems { .. }
1444 | Update::StartReattachItems
1445 | Update::EndReattachItems
1446 | Update::Clear => {}
1447 }
1448 }
1449
1450 let store = self.store.clone();
1457 let room_id = self.state.room_id.clone();
1458 let cloned_updates = updates.clone();
1459
1460 spawn(async move {
1461 trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1462 let linked_chunk_id = LinkedChunkId::Room(&room_id);
1463 store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1464 trace!("linked chunk updates applied");
1465
1466 super::Result::Ok(())
1467 })
1468 .await
1469 .expect("joining failed")?;
1470
1471 let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1473 linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()),
1474 updates,
1475 });
1476
1477 Ok(())
1478 }
1479
1480 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1486 self.reset_internal().await?;
1487
1488 let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
1489
1490 debug_assert_eq!(diff_updates.len(), 1);
1492 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1493
1494 Ok(diff_updates)
1495 }
1496
1497 async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1498 self.state.room_linked_chunk.reset();
1499
1500 for thread in self.state.threads.values_mut() {
1505 thread.clear();
1506 }
1507
1508 self.propagate_changes().await?;
1509
1510 self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst);
1514 self.state
1516 .pagination_status
1517 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1518
1519 Ok(())
1520 }
1521
1522 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1530 pub async fn handle_sync(
1531 &mut self,
1532 mut timeline: Timeline,
1533 ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1534 let mut prev_batch = timeline.prev_batch.take();
1535
1536 let DeduplicationOutcome {
1537 all_events: events,
1538 in_memory_duplicated_event_ids,
1539 in_store_duplicated_event_ids,
1540 non_empty_all_duplicates: all_duplicates,
1541 } = filter_duplicate_events(
1542 &self.store,
1543 LinkedChunkId::Room(&self.state.room_id),
1544 &self.state.room_linked_chunk,
1545 timeline.events,
1546 )
1547 .await?;
1548
1549 if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
1562 || all_duplicates
1563 {
1564 prev_batch = None;
1565 }
1566
1567 if prev_batch.is_some() {
1568 let mut summaries_to_update = Vec::new();
1573
1574 for (thread_root, thread) in self.state.threads.iter_mut() {
1575 thread.clear();
1577
1578 summaries_to_update.push(thread_root.clone());
1579 }
1580
1581 for thread_root in summaries_to_update {
1585 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1586 else {
1587 trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1588 continue;
1589 };
1590
1591 if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1592 prev_summary.latest_reply = None;
1593
1594 target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1595
1596 self.replace_event_at(location, target_event).await?;
1597 }
1598 }
1599 }
1600
1601 if all_duplicates {
1602 return Ok((false, Vec::new()));
1605 }
1606
1607 let has_new_gap = prev_batch.is_some();
1608
1609 if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap {
1612 self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst);
1613 }
1614
1615 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1620 .await?;
1621
1622 self.state
1623 .room_linked_chunk
1624 .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1625
1626 self.post_process_new_events(events, true).await?;
1627
1628 if timeline.limited && has_new_gap {
1629 self.shrink_to_last_chunk().await?;
1636 }
1637
1638 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1639
1640 Ok((has_new_gap, timeline_event_diffs))
1641 }
1642
1643 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1651 pub async fn handle_backpagination(
1652 &mut self,
1653 events: Vec<Event>,
1654 mut new_token: Option<String>,
1655 prev_token: Option<String>,
1656 ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1657 {
1658 let prev_gap_id = if let Some(token) = prev_token {
1661 let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| {
1663 matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1664 });
1665
1666 if gap_chunk_id.is_none() {
1667 return Ok(None);
1673 }
1674
1675 gap_chunk_id
1676 } else {
1677 None
1678 };
1679
1680 let DeduplicationOutcome {
1681 all_events: mut events,
1682 in_memory_duplicated_event_ids,
1683 in_store_duplicated_event_ids,
1684 non_empty_all_duplicates: all_duplicates,
1685 } = filter_duplicate_events(
1686 &self.store,
1687 LinkedChunkId::Room(&self.state.room_id),
1688 &self.state.room_linked_chunk,
1689 events,
1690 )
1691 .await?;
1692
1693 if !all_duplicates {
1707 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1709 .await?;
1710 } else {
1711 events.clear();
1713 new_token = None;
1716 }
1717
1718 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1721
1722 let new_gap = new_token.map(|prev_token| Gap { prev_token });
1723 let reached_start = self.state.room_linked_chunk.finish_back_pagination(
1724 prev_gap_id,
1725 new_gap,
1726 &topo_ordered_events,
1727 );
1728
1729 self.post_process_new_events(topo_ordered_events, false).await?;
1731
1732 let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1733
1734 Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1735 }
1736
1737 pub fn subscribe_to_thread(
1740 &mut self,
1741 root: OwnedEventId,
1742 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1743 self.get_or_reload_thread(root).subscribe()
1744 }
1745
1746 pub fn finish_thread_network_pagination(
1750 &mut self,
1751 root: OwnedEventId,
1752 prev_token: Option<String>,
1753 new_token: Option<String>,
1754 events: Vec<Event>,
1755 ) -> Option<BackPaginationOutcome> {
1756 self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1757 }
1758
1759 pub fn load_more_thread_events_backwards(
1760 &mut self,
1761 root: OwnedEventId,
1762 ) -> LoadMoreEventsBackwardsOutcome {
1763 self.get_or_reload_thread(root).load_more_events_backwards()
1764 }
1765
1766 pub(in super::super) async fn post_process_new_events(
1775 &mut self,
1776 events: Vec<Event>,
1777 is_sync: bool,
1778 ) -> Result<(), EventCacheError> {
1779 self.propagate_changes().await?;
1781
1782 let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1783
1784 for event in events {
1785 self.maybe_apply_new_redaction(&event).await?;
1786
1787 if self.state.enabled_thread_support {
1788 if is_sync {
1793 if let Some(thread_root) = extract_thread_root(event.raw()) {
1794 new_events_by_thread
1795 .entry(thread_root)
1796 .or_default()
1797 .push(event.clone());
1798 } else if let Some(event_id) = event.event_id() {
1799 if self.state.threads.contains_key(&event_id) {
1801 new_events_by_thread
1802 .entry(event_id)
1803 .or_default()
1804 .push(event.clone());
1805 }
1806 }
1807 }
1808
1809 if let Some(edit_target) = extract_edit_target(event.raw()) {
1811 if let Some((_location, edit_target_event)) =
1813 self.find_event(&edit_target).await?
1814 && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1815 {
1816 new_events_by_thread.entry(thread_root).or_default();
1819 }
1820 }
1821 }
1822
1823 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1825 self.save_events([*bundled_thread]).await?;
1826 }
1827 }
1828
1829 if self.state.enabled_thread_support {
1830 self.update_threads(new_events_by_thread).await?;
1831 }
1832
1833 Ok(())
1834 }
1835
1836 fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1837 let room_id = self.state.room_id.clone();
1840 let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1841
1842 self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1843 ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender)
1844 })
1845 }
1846
1847 #[instrument(skip_all)]
1848 async fn update_threads(
1849 &mut self,
1850 new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1851 ) -> Result<(), EventCacheError> {
1852 for (thread_root, new_events) in new_events_by_thread {
1853 let thread_cache = self.get_or_reload_thread(thread_root.clone());
1854
1855 thread_cache.add_live_events(new_events);
1856
1857 let mut latest_event_id = thread_cache.latest_event_id();
1858
1859 if let Some(event_id) = latest_event_id.as_ref()
1862 && let Some((_, edits)) = self
1863 .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1864 .await?
1865 && let Some(latest_edit) = edits.last()
1866 {
1867 latest_event_id = latest_edit.event_id();
1868 }
1869
1870 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1871 }
1872
1873 Ok(())
1874 }
1875
1876 async fn maybe_update_thread_summary(
1878 &mut self,
1879 thread_root: OwnedEventId,
1880 latest_event_id: Option<OwnedEventId>,
1881 ) -> Result<(), EventCacheError> {
1882 let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1886 trace!(%thread_root, "thread root event is missing from the room linked chunk");
1887 return Ok(());
1888 };
1889
1890 let prev_summary = target_event.thread_summary.summary();
1891
1892 let num_replies = {
1901 let thread_replies = self
1902 .store
1903 .find_event_relations(
1904 &self.state.room_id,
1905 &thread_root,
1906 Some(&[RelationType::Thread]),
1907 )
1908 .await?;
1909 thread_replies.len().try_into().unwrap_or(u32::MAX)
1910 };
1911
1912 let new_summary = if num_replies > 0 {
1913 Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1914 } else {
1915 None
1916 };
1917
1918 if prev_summary == new_summary.as_ref() {
1919 trace!(%thread_root, "thread summary is already up-to-date");
1920 return Ok(());
1921 }
1922
1923 trace!(%thread_root, "updating thread summary: {new_summary:?}");
1925 target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
1926 self.replace_event_at(location, target_event).await
1927 }
1928
1929 pub(crate) async fn replace_event_at(
1936 &mut self,
1937 location: EventLocation,
1938 event: Event,
1939 ) -> Result<(), EventCacheError> {
1940 match location {
1941 EventLocation::Memory(position) => {
1942 self.state
1943 .room_linked_chunk
1944 .replace_event_at(position, event)
1945 .expect("should have been a valid position of an item");
1946 self.propagate_changes().await?;
1949 }
1950 EventLocation::Store => {
1951 self.save_events([event]).await?;
1952 }
1953 }
1954
1955 Ok(())
1956 }
1957
1958 #[instrument(skip_all)]
1962 async fn maybe_apply_new_redaction(
1963 &mut self,
1964 event: &Event,
1965 ) -> Result<(), EventCacheError> {
1966 let raw_event = event.raw();
1967
1968 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1971 raw_event.get_field::<MessageLikeEventType>("type")
1972 else {
1973 return Ok(());
1974 };
1975
1976 let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1979 redaction,
1980 ))) = raw_event.deserialize()
1981 else {
1982 return Ok(());
1983 };
1984
1985 let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
1986 warn!("missing target event id from the redaction event");
1987 return Ok(());
1988 };
1989
1990 let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1992 trace!("redacted event is missing from the linked chunk");
1993 return Ok(());
1994 };
1995
1996 let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
1998 match deserialized {
2001 AnySyncTimelineEvent::MessageLike(ev) => {
2002 if ev.is_redacted() {
2003 return Ok(());
2004 }
2005 }
2006 AnySyncTimelineEvent::State(ev) => {
2007 if ev.is_redacted() {
2008 return Ok(());
2009 }
2010 }
2011 }
2012
2013 extract_thread_root(target_event.raw())
2016 } else {
2017 warn!("failed to deserialize the event to redact");
2018 None
2019 };
2020
2021 if let Some(redacted_event) = apply_redaction(
2022 target_event.raw(),
2023 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
2024 &self.state.room_version_rules.redaction,
2025 ) {
2026 target_event.replace_raw(redacted_event.cast_unchecked());
2031
2032 self.replace_event_at(location, target_event).await?;
2033
2034 if let Some(thread_root) = thread_root
2042 && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
2043 {
2044 thread_cache.remove_if_present(event_id);
2045
2046 let latest_event_id = thread_cache.latest_event_id();
2049 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2050 }
2051 }
2052
2053 Ok(())
2054 }
2055
2056 pub async fn save_events(
2058 &mut self,
2059 events: impl IntoIterator<Item = Event>,
2060 ) -> Result<(), EventCacheError> {
2061 let store = self.store.clone();
2062 let room_id = self.state.room_id.clone();
2063 let events = events.into_iter().collect::<Vec<_>>();
2064
2065 spawn(async move {
2067 for event in events {
2068 store.save_event(&room_id, event).await?;
2069 }
2070 super::Result::Ok(())
2071 })
2072 .await
2073 .expect("joining failed")?;
2074
2075 Ok(())
2076 }
2077
2078 #[cfg(test)]
2079 pub fn is_dirty(&self) -> bool {
2080 EventCacheStoreLockGuard::is_dirty(&self.store)
2081 }
2082 }
2083
2084 async fn load_linked_chunk_metadata(
2090 store_guard: &EventCacheStoreLockGuard,
2091 linked_chunk_id: LinkedChunkId<'_>,
2092 ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
2093 let mut all_chunks = store_guard
2094 .load_all_chunks_metadata(linked_chunk_id)
2095 .await
2096 .map_err(EventCacheError::from)?;
2097
2098 if all_chunks.is_empty() {
2099 return Ok(None);
2101 }
2102
2103 let chunk_map: HashMap<_, _> =
2105 all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
2106
2107 let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
2109 let Some(last) = iter.next() else {
2110 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2111 details: "no last chunk found".to_owned(),
2112 });
2113 };
2114
2115 if let Some(other_last) = iter.next() {
2117 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2118 details: format!(
2119 "chunks {} and {} both claim to be last chunks",
2120 last.identifier.index(),
2121 other_last.identifier.index()
2122 ),
2123 });
2124 }
2125
2126 let mut seen = HashSet::new();
2129 let mut current = last;
2130 loop {
2131 if !seen.insert(current.identifier) {
2133 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2134 details: format!(
2135 "cycle detected in linked chunk at {}",
2136 current.identifier.index()
2137 ),
2138 });
2139 }
2140
2141 let Some(prev_id) = current.previous else {
2142 if seen.len() != all_chunks.len() {
2144 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2145 details: format!(
2146 "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
2147 seen.len(),
2148 all_chunks.len()
2149 ),
2150 });
2151 }
2152 break;
2153 };
2154
2155 let Some(pred_meta) = chunk_map.get(&prev_id) else {
2158 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2159 details: format!(
2160 "missing predecessor {} chunk for {}",
2161 prev_id.index(),
2162 current.identifier.index()
2163 ),
2164 });
2165 };
2166
2167 if pred_meta.next != Some(current.identifier) {
2169 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2170 details: format!(
2171 "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
2172 pred_meta.identifier.index(),
2173 pred_meta.next.map(|chunk_id| chunk_id.index()),
2174 current.identifier.index()
2175 ),
2176 });
2177 }
2178
2179 current = *pred_meta;
2180 }
2181
2182 let mut current = current.identifier;
2190 for i in 0..all_chunks.len() {
2191 let j = all_chunks
2193 .iter()
2194 .rev()
2195 .position(|meta| meta.identifier == current)
2196 .map(|j| all_chunks.len() - 1 - j)
2197 .expect("the target chunk must be present in the metadata");
2198 if i != j {
2199 all_chunks.swap(i, j);
2200 }
2201 if let Some(next) = all_chunks[i].next {
2202 current = next;
2203 }
2204 }
2205
2206 Ok(Some(all_chunks))
2207 }
2208
2209 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
2213 let mut closure = || -> Option<()> {
2217 let mut val: serde_json::Value = event.deserialize_as().ok()?;
2218 let unsigned = val.get_mut("unsigned")?;
2219 let unsigned_obj = unsigned.as_object_mut()?;
2220 if unsigned_obj.remove("m.relations").is_some() {
2221 *event = Raw::new(&val).ok()?.cast_unchecked();
2222 }
2223 None
2224 };
2225 let _ = closure();
2226 }
2227
2228 fn strip_relations_from_event(ev: &mut Event) {
2229 match &mut ev.kind {
2230 TimelineEventKind::Decrypted(decrypted) => {
2231 decrypted.unsigned_encryption_info = None;
2234
2235 strip_relations_if_present(&mut decrypted.event);
2237 }
2238
2239 TimelineEventKind::UnableToDecrypt { event, .. }
2240 | TimelineEventKind::PlainText { event } => {
2241 strip_relations_if_present(event);
2242 }
2243 }
2244 }
2245
2246 fn strip_relations_from_events(items: &mut [Event]) {
2248 for ev in items.iter_mut() {
2249 strip_relations_from_event(ev);
2250 }
2251 }
2252
2253 async fn find_event(
2256 event_id: &EventId,
2257 room_id: &RoomId,
2258 room_linked_chunk: &EventLinkedChunk,
2259 store: &EventCacheStoreLockGuard,
2260 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
2261 for (position, event) in room_linked_chunk.revents() {
2264 if event.event_id().as_deref() == Some(event_id) {
2265 return Ok(Some((EventLocation::Memory(position), event.clone())));
2266 }
2267 }
2268
2269 Ok(store.find_event(room_id, event_id).await?.map(|event| (EventLocation::Store, event)))
2270 }
2271
2272 async fn find_event_with_relations(
2276 event_id: &EventId,
2277 room_id: &RoomId,
2278 filters: Option<Vec<RelationType>>,
2279 room_linked_chunk: &EventLinkedChunk,
2280 store: &EventCacheStoreLockGuard,
2281 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
2282 let found = store.find_event(room_id, event_id).await?;
2284
2285 let Some(target) = found else {
2286 return Ok(None);
2288 };
2289
2290 let mut related = store.find_event_relations(room_id, event_id, filters.as_deref()).await?;
2293 let mut stack =
2294 related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
2295
2296 let mut already_seen = HashSet::new();
2299 already_seen.insert(event_id.to_owned());
2300
2301 let mut num_iters = 1;
2302
2303 while let Some(event_id) = stack.pop() {
2305 if !already_seen.insert(event_id.clone()) {
2306 continue;
2308 }
2309
2310 let other_related =
2311 store.find_event_relations(room_id, &event_id, filters.as_deref()).await?;
2312
2313 stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
2314 related.extend(other_related);
2315
2316 num_iters += 1;
2317 }
2318
2319 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
2320
2321 related.sort_by(|(_, lhs), (_, rhs)| {
2325 use std::cmp::Ordering;
2326
2327 match (lhs, rhs) {
2328 (None, None) => Ordering::Equal,
2329 (None, Some(_)) => Ordering::Less,
2330 (Some(_), None) => Ordering::Greater,
2331 (Some(lhs), Some(rhs)) => {
2332 let lhs = room_linked_chunk.event_order(*lhs);
2333 let rhs = room_linked_chunk.event_order(*rhs);
2334
2335 match (lhs, rhs) {
2339 (None, None) => Ordering::Equal,
2340 (None, Some(_)) => Ordering::Less,
2341 (Some(_), None) => Ordering::Greater,
2342 (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
2343 }
2344 }
2345 }
2346 });
2347
2348 let related = related.into_iter().map(|(event, _pos)| event).collect();
2350
2351 Ok(Some((target, related)))
2352 }
2353}
2354
2355pub(super) enum EventLocation {
2357 Memory(Position),
2359
2360 Store,
2362}
2363
2364pub(super) use private::RoomEventCacheStateLock;
2365
2366#[cfg(test)]
2367mod tests {
2368 use matrix_sdk_base::event_cache::Event;
2369 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2370 use ruma::{
2371 RoomId, event_id,
2372 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2373 room_id, user_id,
2374 };
2375
2376 use crate::test_utils::logged_in_client;
2377
2378 #[async_test]
2379 async fn test_find_event_by_id_with_edit_relation() {
2380 let original_id = event_id!("$original");
2381 let related_id = event_id!("$related");
2382 let room_id = room_id!("!galette:saucisse.bzh");
2383 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2384
2385 assert_relations(
2386 room_id,
2387 f.text_msg("Original event").event_id(original_id).into(),
2388 f.text_msg("* An edited event")
2389 .edit(
2390 original_id,
2391 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2392 )
2393 .event_id(related_id)
2394 .into(),
2395 f,
2396 )
2397 .await;
2398 }
2399
2400 #[async_test]
2401 async fn test_find_event_by_id_with_thread_reply_relation() {
2402 let original_id = event_id!("$original");
2403 let related_id = event_id!("$related");
2404 let room_id = room_id!("!galette:saucisse.bzh");
2405 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2406
2407 assert_relations(
2408 room_id,
2409 f.text_msg("Original event").event_id(original_id).into(),
2410 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2411 f,
2412 )
2413 .await;
2414 }
2415
2416 #[async_test]
2417 async fn test_find_event_by_id_with_reaction_relation() {
2418 let original_id = event_id!("$original");
2419 let related_id = event_id!("$related");
2420 let room_id = room_id!("!galette:saucisse.bzh");
2421 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2422
2423 assert_relations(
2424 room_id,
2425 f.text_msg("Original event").event_id(original_id).into(),
2426 f.reaction(original_id, ":D").event_id(related_id).into(),
2427 f,
2428 )
2429 .await;
2430 }
2431
2432 #[async_test]
2433 async fn test_find_event_by_id_with_poll_response_relation() {
2434 let original_id = event_id!("$original");
2435 let related_id = event_id!("$related");
2436 let room_id = room_id!("!galette:saucisse.bzh");
2437 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2438
2439 assert_relations(
2440 room_id,
2441 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2442 .event_id(original_id)
2443 .into(),
2444 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2445 f,
2446 )
2447 .await;
2448 }
2449
2450 #[async_test]
2451 async fn test_find_event_by_id_with_poll_end_relation() {
2452 let original_id = event_id!("$original");
2453 let related_id = event_id!("$related");
2454 let room_id = room_id!("!galette:saucisse.bzh");
2455 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2456
2457 assert_relations(
2458 room_id,
2459 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2460 .event_id(original_id)
2461 .into(),
2462 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2463 f,
2464 )
2465 .await;
2466 }
2467
2468 #[async_test]
2469 async fn test_find_event_by_id_with_filtered_relationships() {
2470 let original_id = event_id!("$original");
2471 let related_id = event_id!("$related");
2472 let associated_related_id = event_id!("$recursive_related");
2473 let room_id = room_id!("!galette:saucisse.bzh");
2474 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2475
2476 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2477 let related_event = event_factory
2478 .text_msg("* Edited event")
2479 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2480 .event_id(related_id)
2481 .into();
2482 let associated_related_event =
2483 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2484
2485 let client = logged_in_client(None).await;
2486
2487 let event_cache = client.event_cache();
2488 event_cache.subscribe().unwrap();
2489
2490 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2491 let room = client.get_room(room_id).unwrap();
2492
2493 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2494
2495 room_event_cache.save_events([original_event]).await;
2497
2498 room_event_cache.save_events([related_event]).await;
2500
2501 room_event_cache.save_events([associated_related_event]).await;
2503
2504 let filter = Some(vec![RelationType::Replacement]);
2505 let (event, related_events) = room_event_cache
2506 .find_event_with_relations(original_id, filter)
2507 .await
2508 .expect("Failed to find the event with relations")
2509 .expect("Event has no relation");
2510 let cached_event_id = event.event_id().unwrap();
2512 assert_eq!(cached_event_id, original_id);
2513
2514 assert_eq!(related_events.len(), 1);
2516
2517 let related_event_id = related_events[0].event_id().unwrap();
2518 assert_eq!(related_event_id, related_id);
2519
2520 let filter = Some(vec![RelationType::Thread]);
2522 let (event, related_events) = room_event_cache
2523 .find_event_with_relations(original_id, filter)
2524 .await
2525 .expect("Failed to find the event with relations")
2526 .expect("Event has no relation");
2527
2528 let cached_event_id = event.event_id().unwrap();
2530 assert_eq!(cached_event_id, original_id);
2531 assert!(related_events.is_empty());
2533 }
2534
2535 #[async_test]
2536 async fn test_find_event_by_id_with_recursive_relation() {
2537 let original_id = event_id!("$original");
2538 let related_id = event_id!("$related");
2539 let associated_related_id = event_id!("$recursive_related");
2540 let room_id = room_id!("!galette:saucisse.bzh");
2541 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2542
2543 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2544 let related_event = event_factory
2545 .text_msg("* Edited event")
2546 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2547 .event_id(related_id)
2548 .into();
2549 let associated_related_event =
2550 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2551
2552 let client = logged_in_client(None).await;
2553
2554 let event_cache = client.event_cache();
2555 event_cache.subscribe().unwrap();
2556
2557 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2558 let room = client.get_room(room_id).unwrap();
2559
2560 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2561
2562 room_event_cache.save_events([original_event]).await;
2564
2565 room_event_cache.save_events([related_event]).await;
2567
2568 room_event_cache.save_events([associated_related_event]).await;
2570
2571 let (event, related_events) = room_event_cache
2572 .find_event_with_relations(original_id, None)
2573 .await
2574 .expect("Failed to find the event with relations")
2575 .expect("Event has no relation");
2576 let cached_event_id = event.event_id().unwrap();
2578 assert_eq!(cached_event_id, original_id);
2579
2580 assert_eq!(related_events.len(), 2);
2582
2583 let related_event_id = related_events[0].event_id().unwrap();
2584 assert_eq!(related_event_id, related_id);
2585 let related_event_id = related_events[1].event_id().unwrap();
2586 assert_eq!(related_event_id, associated_related_id);
2587 }
2588
2589 async fn assert_relations(
2590 room_id: &RoomId,
2591 original_event: Event,
2592 related_event: Event,
2593 event_factory: EventFactory,
2594 ) {
2595 let client = logged_in_client(None).await;
2596
2597 let event_cache = client.event_cache();
2598 event_cache.subscribe().unwrap();
2599
2600 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2601 let room = client.get_room(room_id).unwrap();
2602
2603 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2604
2605 let original_event_id = original_event.event_id().unwrap();
2607 room_event_cache.save_events([original_event]).await;
2608
2609 let unrelated_id = event_id!("$2");
2611 room_event_cache
2612 .save_events([event_factory
2613 .text_msg("An unrelated event")
2614 .event_id(unrelated_id)
2615 .into()])
2616 .await;
2617
2618 let related_id = related_event.event_id().unwrap();
2620 room_event_cache.save_events([related_event]).await;
2621
2622 let (event, related_events) = room_event_cache
2623 .find_event_with_relations(&original_event_id, None)
2624 .await
2625 .expect("Failed to find the event with relations")
2626 .expect("Event has no relation");
2627 let cached_event_id = event.event_id().unwrap();
2629 assert_eq!(cached_event_id, original_event_id);
2630
2631 let related_event_id = related_events[0].event_id().unwrap();
2633 assert_eq!(related_event_id, related_id);
2634 }
2635}
2636
2637#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
2639 use std::{ops::Not, sync::Arc};
2640
2641 use assert_matches::assert_matches;
2642 use assert_matches2::assert_let;
2643 use eyeball_im::VectorDiff;
2644 use futures_util::FutureExt;
2645 use matrix_sdk_base::{
2646 event_cache::{
2647 Gap,
2648 store::{EventCacheStore as _, MemoryStore},
2649 },
2650 linked_chunk::{
2651 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2652 lazy_loader::from_all_chunks,
2653 },
2654 store::StoreConfig,
2655 sync::{JoinedRoomUpdate, Timeline},
2656 };
2657 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2658 use ruma::{
2659 EventId, OwnedUserId, event_id,
2660 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2661 room_id, user_id,
2662 };
2663 use tokio::task::yield_now;
2664
2665 use super::RoomEventCacheGenericUpdate;
2666 use crate::{
2667 assert_let_timeout,
2668 event_cache::{RoomEventCache, RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2669 test_utils::client::MockClientBuilder,
2670 };
2671
2672 #[async_test]
2673 async fn test_write_to_storage() {
2674 let room_id = room_id!("!galette:saucisse.bzh");
2675 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2676
2677 let event_cache_store = Arc::new(MemoryStore::new());
2678
2679 let client = MockClientBuilder::new(None)
2680 .on_builder(|builder| {
2681 builder.store_config(
2682 StoreConfig::new("hodlor".to_owned())
2683 .event_cache_store(event_cache_store.clone()),
2684 )
2685 })
2686 .build()
2687 .await;
2688
2689 let event_cache = client.event_cache();
2690
2691 event_cache.subscribe().unwrap();
2693
2694 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2695 let room = client.get_room(room_id).unwrap();
2696
2697 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2698 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2699
2700 let timeline = Timeline {
2702 limited: true,
2703 prev_batch: Some("raclette".to_owned()),
2704 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2705 };
2706
2707 room_event_cache
2708 .inner
2709 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2710 .await
2711 .unwrap();
2712
2713 assert_matches!(
2715 generic_stream.recv().await,
2716 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2717 assert_eq!(expected_room_id, room_id);
2718 }
2719 );
2720
2721 let linked_chunk = from_all_chunks::<3, _, _>(
2723 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2724 )
2725 .unwrap()
2726 .unwrap();
2727
2728 assert_eq!(linked_chunk.chunks().count(), 2);
2729
2730 let mut chunks = linked_chunk.chunks();
2731
2732 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2734 assert_eq!(gap.prev_token, "raclette");
2735 });
2736
2737 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2739 assert_eq!(events.len(), 1);
2740 let deserialized = events[0].raw().deserialize().unwrap();
2741 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2742 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2743 });
2744
2745 assert!(chunks.next().is_none());
2747 }
2748
2749 #[async_test]
2750 async fn test_write_to_storage_strips_bundled_relations() {
2751 let room_id = room_id!("!galette:saucisse.bzh");
2752 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2753
2754 let event_cache_store = Arc::new(MemoryStore::new());
2755
2756 let client = MockClientBuilder::new(None)
2757 .on_builder(|builder| {
2758 builder.store_config(
2759 StoreConfig::new("hodlor".to_owned())
2760 .event_cache_store(event_cache_store.clone()),
2761 )
2762 })
2763 .build()
2764 .await;
2765
2766 let event_cache = client.event_cache();
2767
2768 event_cache.subscribe().unwrap();
2770
2771 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2772 let room = client.get_room(room_id).unwrap();
2773
2774 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2775 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2776
2777 let ev = f
2779 .text_msg("hey yo")
2780 .sender(*ALICE)
2781 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2782 .into_event();
2783
2784 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2785
2786 room_event_cache
2787 .inner
2788 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2789 .await
2790 .unwrap();
2791
2792 assert_matches!(
2794 generic_stream.recv().await,
2795 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2796 assert_eq!(expected_room_id, room_id);
2797 }
2798 );
2799
2800 {
2802 let events = room_event_cache.events().await.unwrap();
2803
2804 assert_eq!(events.len(), 1);
2805
2806 let ev = events[0].raw().deserialize().unwrap();
2807 assert_let!(
2808 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2809 );
2810
2811 let original = msg.as_original().unwrap();
2812 assert_eq!(original.content.body(), "hey yo");
2813 assert!(original.unsigned.relations.replace.is_some());
2814 }
2815
2816 let linked_chunk = from_all_chunks::<3, _, _>(
2818 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2819 )
2820 .unwrap()
2821 .unwrap();
2822
2823 assert_eq!(linked_chunk.chunks().count(), 1);
2824
2825 let mut chunks = linked_chunk.chunks();
2826 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2827 assert_eq!(events.len(), 1);
2828
2829 let ev = events[0].raw().deserialize().unwrap();
2830 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2831
2832 let original = msg.as_original().unwrap();
2833 assert_eq!(original.content.body(), "hey yo");
2834 assert!(original.unsigned.relations.replace.is_none());
2835 });
2836
2837 assert!(chunks.next().is_none());
2839 }
2840
2841 #[async_test]
2842 async fn test_clear() {
2843 let room_id = room_id!("!galette:saucisse.bzh");
2844 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2845
2846 let event_cache_store = Arc::new(MemoryStore::new());
2847
2848 let event_id1 = event_id!("$1");
2849 let event_id2 = event_id!("$2");
2850
2851 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2852 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2853
2854 event_cache_store
2856 .handle_linked_chunk_updates(
2857 LinkedChunkId::Room(room_id),
2858 vec![
2859 Update::NewItemsChunk {
2861 previous: None,
2862 new: ChunkIdentifier::new(0),
2863 next: None,
2864 },
2865 Update::NewGapChunk {
2867 previous: Some(ChunkIdentifier::new(0)),
2868 new: ChunkIdentifier::new(42),
2870 next: None,
2871 gap: Gap { prev_token: "comté".to_owned() },
2872 },
2873 Update::NewItemsChunk {
2875 previous: Some(ChunkIdentifier::new(42)),
2876 new: ChunkIdentifier::new(1),
2877 next: None,
2878 },
2879 Update::PushItems {
2880 at: Position::new(ChunkIdentifier::new(1), 0),
2881 items: vec![ev1.clone()],
2882 },
2883 Update::NewItemsChunk {
2885 previous: Some(ChunkIdentifier::new(1)),
2886 new: ChunkIdentifier::new(2),
2887 next: None,
2888 },
2889 Update::PushItems {
2890 at: Position::new(ChunkIdentifier::new(2), 0),
2891 items: vec![ev2.clone()],
2892 },
2893 ],
2894 )
2895 .await
2896 .unwrap();
2897
2898 let client = MockClientBuilder::new(None)
2899 .on_builder(|builder| {
2900 builder.store_config(
2901 StoreConfig::new("hodlor".to_owned())
2902 .event_cache_store(event_cache_store.clone()),
2903 )
2904 })
2905 .build()
2906 .await;
2907
2908 let event_cache = client.event_cache();
2909
2910 event_cache.subscribe().unwrap();
2912
2913 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2914 let room = client.get_room(room_id).unwrap();
2915
2916 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2917
2918 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
2919 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2920
2921 {
2923 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
2924 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
2925 }
2926
2927 {
2929 assert_eq!(items.len(), 1);
2931 assert_eq!(items[0].event_id().unwrap(), event_id2);
2932
2933 assert!(stream.is_empty());
2934 }
2935
2936 {
2938 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2939
2940 assert_let_timeout!(
2941 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2942 );
2943 assert_eq!(diffs.len(), 1);
2944 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2945 assert_eq!(event.event_id().unwrap(), event_id1);
2947 });
2948
2949 assert!(stream.is_empty());
2950 }
2951
2952 room_event_cache.clear().await.unwrap();
2954
2955 assert_let_timeout!(
2957 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2958 );
2959 assert_eq!(diffs.len(), 1);
2960 assert_let!(VectorDiff::Clear = &diffs[0]);
2961
2962 assert_let_timeout!(
2964 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2965 );
2966 assert_eq!(received_room_id, room_id);
2967
2968 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
2971
2972 let items = room_event_cache.events().await.unwrap();
2974 assert!(items.is_empty());
2975
2976 let linked_chunk = from_all_chunks::<3, _, _>(
2978 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2979 )
2980 .unwrap()
2981 .unwrap();
2982
2983 assert_eq!(linked_chunk.num_items(), 0);
2987 }
2988
2989 #[async_test]
2990 async fn test_load_from_storage() {
2991 let room_id = room_id!("!galette:saucisse.bzh");
2992 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2993
2994 let event_cache_store = Arc::new(MemoryStore::new());
2995
2996 let event_id1 = event_id!("$1");
2997 let event_id2 = event_id!("$2");
2998
2999 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3000 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3001
3002 event_cache_store
3004 .handle_linked_chunk_updates(
3005 LinkedChunkId::Room(room_id),
3006 vec![
3007 Update::NewItemsChunk {
3009 previous: None,
3010 new: ChunkIdentifier::new(0),
3011 next: None,
3012 },
3013 Update::NewGapChunk {
3015 previous: Some(ChunkIdentifier::new(0)),
3016 new: ChunkIdentifier::new(42),
3018 next: None,
3019 gap: Gap { prev_token: "cheddar".to_owned() },
3020 },
3021 Update::NewItemsChunk {
3023 previous: Some(ChunkIdentifier::new(42)),
3024 new: ChunkIdentifier::new(1),
3025 next: None,
3026 },
3027 Update::PushItems {
3028 at: Position::new(ChunkIdentifier::new(1), 0),
3029 items: vec![ev1.clone()],
3030 },
3031 Update::NewItemsChunk {
3033 previous: Some(ChunkIdentifier::new(1)),
3034 new: ChunkIdentifier::new(2),
3035 next: None,
3036 },
3037 Update::PushItems {
3038 at: Position::new(ChunkIdentifier::new(2), 0),
3039 items: vec![ev2.clone()],
3040 },
3041 ],
3042 )
3043 .await
3044 .unwrap();
3045
3046 let client = MockClientBuilder::new(None)
3047 .on_builder(|builder| {
3048 builder.store_config(
3049 StoreConfig::new("hodlor".to_owned())
3050 .event_cache_store(event_cache_store.clone()),
3051 )
3052 })
3053 .build()
3054 .await;
3055
3056 let event_cache = client.event_cache();
3057
3058 event_cache.subscribe().unwrap();
3060
3061 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3063
3064 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3065 let room = client.get_room(room_id).unwrap();
3066
3067 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3068
3069 assert_matches!(
3072 generic_stream.recv().await,
3073 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3074 assert_eq!(room_id, expected_room_id);
3075 }
3076 );
3077
3078 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3079
3080 assert_eq!(items.len(), 1);
3083 assert_eq!(items[0].event_id().unwrap(), event_id2);
3084 assert!(stream.is_empty());
3085
3086 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3088 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3089
3090 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3092
3093 assert_let_timeout!(
3094 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3095 );
3096 assert_eq!(diffs.len(), 1);
3097 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3098 assert_eq!(event.event_id().unwrap(), event_id1);
3099 });
3100
3101 assert!(stream.is_empty());
3102
3103 assert_matches!(
3105 generic_stream.recv().await,
3106 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3107 assert_eq!(expected_room_id, room_id);
3108 }
3109 );
3110
3111 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
3113
3114 room_event_cache
3115 .inner
3116 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
3117 .await
3118 .unwrap();
3119
3120 assert!(generic_stream.recv().now_or_never().is_none());
3123
3124 let items = room_event_cache.events().await.unwrap();
3129 assert_eq!(items.len(), 2);
3130 assert_eq!(items[0].event_id().unwrap(), event_id1);
3131 assert_eq!(items[1].event_id().unwrap(), event_id2);
3132 }
3133
3134 #[async_test]
3135 async fn test_load_from_storage_resilient_to_failure() {
3136 let room_id = room_id!("!fondue:patate.ch");
3137 let event_cache_store = Arc::new(MemoryStore::new());
3138
3139 let event = EventFactory::new()
3140 .room(room_id)
3141 .sender(user_id!("@ben:saucisse.bzh"))
3142 .text_msg("foo")
3143 .event_id(event_id!("$42"))
3144 .into_event();
3145
3146 event_cache_store
3148 .handle_linked_chunk_updates(
3149 LinkedChunkId::Room(room_id),
3150 vec![
3151 Update::NewItemsChunk {
3152 previous: None,
3153 new: ChunkIdentifier::new(0),
3154 next: None,
3155 },
3156 Update::PushItems {
3157 at: Position::new(ChunkIdentifier::new(0), 0),
3158 items: vec![event],
3159 },
3160 Update::NewItemsChunk {
3161 previous: Some(ChunkIdentifier::new(0)),
3162 new: ChunkIdentifier::new(1),
3163 next: Some(ChunkIdentifier::new(0)),
3164 },
3165 ],
3166 )
3167 .await
3168 .unwrap();
3169
3170 let client = MockClientBuilder::new(None)
3171 .on_builder(|builder| {
3172 builder.store_config(
3173 StoreConfig::new("holder".to_owned())
3174 .event_cache_store(event_cache_store.clone()),
3175 )
3176 })
3177 .build()
3178 .await;
3179
3180 let event_cache = client.event_cache();
3181
3182 event_cache.subscribe().unwrap();
3184
3185 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3186 let room = client.get_room(room_id).unwrap();
3187
3188 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3189
3190 let items = room_event_cache.events().await.unwrap();
3191
3192 assert!(items.is_empty());
3195
3196 let raw_chunks =
3199 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
3200 assert!(raw_chunks.is_empty());
3201 }
3202
3203 #[async_test]
3204 async fn test_no_useless_gaps() {
3205 let room_id = room_id!("!galette:saucisse.bzh");
3206
3207 let client = MockClientBuilder::new(None).build().await;
3208
3209 let event_cache = client.event_cache();
3210 event_cache.subscribe().unwrap();
3211
3212 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3213 let room = client.get_room(room_id).unwrap();
3214 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3215 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3216
3217 let f = EventFactory::new().room(room_id).sender(*ALICE);
3218
3219 room_event_cache
3222 .inner
3223 .handle_joined_room_update(JoinedRoomUpdate {
3224 timeline: Timeline {
3225 limited: true,
3226 prev_batch: Some("raclette".to_owned()),
3227 events: vec![f.text_msg("hey yo").into_event()],
3228 },
3229 ..Default::default()
3230 })
3231 .await
3232 .unwrap();
3233
3234 assert_matches!(
3236 generic_stream.recv().await,
3237 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3238 assert_eq!(expected_room_id, room_id);
3239 }
3240 );
3241
3242 {
3243 let mut state = room_event_cache.inner.state.write().await.unwrap();
3244
3245 let mut num_gaps = 0;
3246 let mut num_events = 0;
3247
3248 for c in state.room_linked_chunk().chunks() {
3249 match c.content() {
3250 ChunkContent::Items(items) => num_events += items.len(),
3251 ChunkContent::Gap(_) => num_gaps += 1,
3252 }
3253 }
3254
3255 assert_eq!(num_gaps, 0);
3258 assert_eq!(num_events, 1);
3259
3260 assert_matches!(
3262 state.load_more_events_backwards().await.unwrap(),
3263 LoadMoreEventsBackwardsOutcome::Gap { .. }
3264 );
3265
3266 num_gaps = 0;
3267 num_events = 0;
3268 for c in state.room_linked_chunk().chunks() {
3269 match c.content() {
3270 ChunkContent::Items(items) => num_events += items.len(),
3271 ChunkContent::Gap(_) => num_gaps += 1,
3272 }
3273 }
3274
3275 assert_eq!(num_gaps, 1);
3277 assert_eq!(num_events, 1);
3278 }
3279
3280 room_event_cache
3283 .inner
3284 .handle_joined_room_update(JoinedRoomUpdate {
3285 timeline: Timeline {
3286 limited: false,
3287 prev_batch: Some("fondue".to_owned()),
3288 events: vec![f.text_msg("sup").into_event()],
3289 },
3290 ..Default::default()
3291 })
3292 .await
3293 .unwrap();
3294
3295 assert_matches!(
3297 generic_stream.recv().await,
3298 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3299 assert_eq!(expected_room_id, room_id);
3300 }
3301 );
3302
3303 {
3304 let state = room_event_cache.inner.state.read().await.unwrap();
3305
3306 let mut num_gaps = 0;
3307 let mut num_events = 0;
3308
3309 for c in state.room_linked_chunk().chunks() {
3310 match c.content() {
3311 ChunkContent::Items(items) => num_events += items.len(),
3312 ChunkContent::Gap(gap) => {
3313 assert_eq!(gap.prev_token, "raclette");
3314 num_gaps += 1;
3315 }
3316 }
3317 }
3318
3319 assert_eq!(num_gaps, 1);
3321 assert_eq!(num_events, 2);
3322 }
3323 }
3324
3325 #[async_test]
3326 async fn test_shrink_to_last_chunk() {
3327 let room_id = room_id!("!galette:saucisse.bzh");
3328
3329 let client = MockClientBuilder::new(None).build().await;
3330
3331 let f = EventFactory::new().room(room_id);
3332
3333 let evid1 = event_id!("$1");
3334 let evid2 = event_id!("$2");
3335
3336 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3337 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3338
3339 {
3341 client
3342 .event_cache_store()
3343 .lock()
3344 .await
3345 .expect("Could not acquire the event cache lock")
3346 .as_clean()
3347 .expect("Could not acquire a clean event cache lock")
3348 .handle_linked_chunk_updates(
3349 LinkedChunkId::Room(room_id),
3350 vec![
3351 Update::NewItemsChunk {
3352 previous: None,
3353 new: ChunkIdentifier::new(0),
3354 next: None,
3355 },
3356 Update::PushItems {
3357 at: Position::new(ChunkIdentifier::new(0), 0),
3358 items: vec![ev1],
3359 },
3360 Update::NewItemsChunk {
3361 previous: Some(ChunkIdentifier::new(0)),
3362 new: ChunkIdentifier::new(1),
3363 next: None,
3364 },
3365 Update::PushItems {
3366 at: Position::new(ChunkIdentifier::new(1), 0),
3367 items: vec![ev2],
3368 },
3369 ],
3370 )
3371 .await
3372 .unwrap();
3373 }
3374
3375 let event_cache = client.event_cache();
3376 event_cache.subscribe().unwrap();
3377
3378 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3379 let room = client.get_room(room_id).unwrap();
3380 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3381
3382 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
3384 assert_eq!(events.len(), 1);
3385 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3386 assert!(stream.is_empty());
3387
3388 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3389
3390 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3392 assert_eq!(outcome.events.len(), 1);
3393 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3394 assert!(outcome.reached_start);
3395
3396 assert_let_timeout!(
3398 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3399 );
3400 assert_eq!(diffs.len(), 1);
3401 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3402 assert_eq!(value.event_id().as_deref(), Some(evid1));
3403 });
3404
3405 assert!(stream.is_empty());
3406
3407 assert_let_timeout!(
3409 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3410 );
3411 assert_eq!(received_room_id, room_id);
3412
3413 let diffs = room_event_cache
3415 .inner
3416 .state
3417 .write()
3418 .await
3419 .unwrap()
3420 .force_shrink_to_last_chunk()
3421 .await
3422 .expect("shrinking should succeed");
3423
3424 assert_eq!(diffs.len(), 2);
3426 assert_matches!(&diffs[0], VectorDiff::Clear);
3427 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3428 assert_eq!(values.len(), 1);
3429 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3430 });
3431
3432 assert!(stream.is_empty());
3433
3434 assert!(generic_stream.is_empty());
3436
3437 let events = room_event_cache.events().await.unwrap();
3439 assert_eq!(events.len(), 1);
3440 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3441
3442 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3445 assert_eq!(outcome.events.len(), 1);
3446 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3447 assert!(outcome.reached_start);
3448 }
3449
3450 #[async_test]
3451 async fn test_room_ordering() {
3452 let room_id = room_id!("!galette:saucisse.bzh");
3453
3454 let client = MockClientBuilder::new(None).build().await;
3455
3456 let f = EventFactory::new().room(room_id).sender(*ALICE);
3457
3458 let evid1 = event_id!("$1");
3459 let evid2 = event_id!("$2");
3460 let evid3 = event_id!("$3");
3461
3462 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3463 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3464 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3465
3466 {
3468 client
3469 .event_cache_store()
3470 .lock()
3471 .await
3472 .expect("Could not acquire the event cache lock")
3473 .as_clean()
3474 .expect("Could not acquire a clean event cache lock")
3475 .handle_linked_chunk_updates(
3476 LinkedChunkId::Room(room_id),
3477 vec![
3478 Update::NewItemsChunk {
3479 previous: None,
3480 new: ChunkIdentifier::new(0),
3481 next: None,
3482 },
3483 Update::PushItems {
3484 at: Position::new(ChunkIdentifier::new(0), 0),
3485 items: vec![ev1, ev2],
3486 },
3487 Update::NewItemsChunk {
3488 previous: Some(ChunkIdentifier::new(0)),
3489 new: ChunkIdentifier::new(1),
3490 next: None,
3491 },
3492 Update::PushItems {
3493 at: Position::new(ChunkIdentifier::new(1), 0),
3494 items: vec![ev3.clone()],
3495 },
3496 ],
3497 )
3498 .await
3499 .unwrap();
3500 }
3501
3502 let event_cache = client.event_cache();
3503 event_cache.subscribe().unwrap();
3504
3505 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3506 let room = client.get_room(room_id).unwrap();
3507 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3508
3509 {
3512 let state = room_event_cache.inner.state.read().await.unwrap();
3513 let room_linked_chunk = state.room_linked_chunk();
3514
3515 assert_eq!(
3517 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3518 Some(0)
3519 );
3520
3521 assert_eq!(
3523 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3524 Some(1)
3525 );
3526
3527 let mut events = room_linked_chunk.events();
3529 let (pos, ev) = events.next().unwrap();
3530 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3531 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3532 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3533
3534 assert!(events.next().is_none());
3536 }
3537
3538 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3540 assert!(outcome.reached_start);
3541
3542 {
3545 let state = room_event_cache.inner.state.read().await.unwrap();
3546 let room_linked_chunk = state.room_linked_chunk();
3547
3548 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
3549 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
3550 }
3551 }
3552
3553 let evid4 = event_id!("$4");
3558 room_event_cache
3559 .inner
3560 .handle_joined_room_update(JoinedRoomUpdate {
3561 timeline: Timeline {
3562 limited: true,
3563 prev_batch: Some("fondue".to_owned()),
3564 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3565 },
3566 ..Default::default()
3567 })
3568 .await
3569 .unwrap();
3570
3571 {
3572 let state = room_event_cache.inner.state.read().await.unwrap();
3573 let room_linked_chunk = state.room_linked_chunk();
3574
3575 let mut events = room_linked_chunk.events();
3577
3578 let (pos, ev) = events.next().unwrap();
3579 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3580 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3581
3582 let (pos, ev) = events.next().unwrap();
3583 assert_eq!(ev.event_id().as_deref(), Some(evid4));
3584 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
3585
3586 assert!(events.next().is_none());
3588
3589 assert_eq!(
3591 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3592 Some(0)
3593 );
3594 assert_eq!(
3595 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3596 Some(1)
3597 );
3598
3599 assert_eq!(
3602 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
3603 None
3604 );
3605 }
3606 }
3607
3608 #[async_test]
3609 async fn test_auto_shrink_after_all_subscribers_are_gone() {
3610 let room_id = room_id!("!galette:saucisse.bzh");
3611
3612 let client = MockClientBuilder::new(None).build().await;
3613
3614 let f = EventFactory::new().room(room_id);
3615
3616 let evid1 = event_id!("$1");
3617 let evid2 = event_id!("$2");
3618
3619 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3620 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3621
3622 {
3624 client
3625 .event_cache_store()
3626 .lock()
3627 .await
3628 .expect("Could not acquire the event cache lock")
3629 .as_clean()
3630 .expect("Could not acquire a clean event cache lock")
3631 .handle_linked_chunk_updates(
3632 LinkedChunkId::Room(room_id),
3633 vec![
3634 Update::NewItemsChunk {
3635 previous: None,
3636 new: ChunkIdentifier::new(0),
3637 next: None,
3638 },
3639 Update::PushItems {
3640 at: Position::new(ChunkIdentifier::new(0), 0),
3641 items: vec![ev1],
3642 },
3643 Update::NewItemsChunk {
3644 previous: Some(ChunkIdentifier::new(0)),
3645 new: ChunkIdentifier::new(1),
3646 next: None,
3647 },
3648 Update::PushItems {
3649 at: Position::new(ChunkIdentifier::new(1), 0),
3650 items: vec![ev2],
3651 },
3652 ],
3653 )
3654 .await
3655 .unwrap();
3656 }
3657
3658 let event_cache = client.event_cache();
3659 event_cache.subscribe().unwrap();
3660
3661 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3662 let room = client.get_room(room_id).unwrap();
3663 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3664
3665 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
3667 assert_eq!(events1.len(), 1);
3668 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3669 assert!(stream1.is_empty());
3670
3671 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3673 assert_eq!(outcome.events.len(), 1);
3674 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3675 assert!(outcome.reached_start);
3676
3677 assert_let_timeout!(
3680 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3681 );
3682 assert_eq!(diffs.len(), 1);
3683 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3684 assert_eq!(value.event_id().as_deref(), Some(evid1));
3685 });
3686
3687 assert!(stream1.is_empty());
3688
3689 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
3693 assert_eq!(events2.len(), 2);
3694 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3695 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3696 assert!(stream2.is_empty());
3697
3698 drop(stream1);
3700 yield_now().await;
3701
3702 assert!(stream2.is_empty());
3704
3705 drop(stream2);
3707 yield_now().await;
3708
3709 {
3712 let state = room_event_cache.inner.state.read().await.unwrap();
3714 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
3715 }
3716
3717 let events3 = room_event_cache.events().await.unwrap();
3719 assert_eq!(events3.len(), 1);
3720 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3721 }
3722
3723 #[async_test]
3724 async fn test_rfind_map_event_in_memory_by() {
3725 let user_id = user_id!("@mnt_io:matrix.org");
3726 let room_id = room_id!("!raclette:patate.ch");
3727 let client = MockClientBuilder::new(None).build().await;
3728
3729 let event_factory = EventFactory::new().room(room_id);
3730
3731 let event_id_0 = event_id!("$ev0");
3732 let event_id_1 = event_id!("$ev1");
3733 let event_id_2 = event_id!("$ev2");
3734 let event_id_3 = event_id!("$ev3");
3735
3736 let event_0 =
3737 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3738 let event_1 =
3739 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3740 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3741 let event_3 =
3742 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3743
3744 {
3747 client
3748 .event_cache_store()
3749 .lock()
3750 .await
3751 .expect("Could not acquire the event cache lock")
3752 .as_clean()
3753 .expect("Could not acquire a clean event cache lock")
3754 .handle_linked_chunk_updates(
3755 LinkedChunkId::Room(room_id),
3756 vec![
3757 Update::NewItemsChunk {
3758 previous: None,
3759 new: ChunkIdentifier::new(0),
3760 next: None,
3761 },
3762 Update::PushItems {
3763 at: Position::new(ChunkIdentifier::new(0), 0),
3764 items: vec![event_3],
3765 },
3766 Update::NewItemsChunk {
3767 previous: Some(ChunkIdentifier::new(0)),
3768 new: ChunkIdentifier::new(1),
3769 next: None,
3770 },
3771 Update::PushItems {
3772 at: Position::new(ChunkIdentifier::new(1), 0),
3773 items: vec![event_0, event_1, event_2],
3774 },
3775 ],
3776 )
3777 .await
3778 .unwrap();
3779 }
3780
3781 let event_cache = client.event_cache();
3782 event_cache.subscribe().unwrap();
3783
3784 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3785 let room = client.get_room(room_id).unwrap();
3786 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3787
3788 assert_matches!(
3790 room_event_cache
3791 .rfind_map_event_in_memory_by(|event| {
3792 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
3793 })
3794 .await,
3795 Ok(Some(event_id)) => {
3796 assert_eq!(event_id.as_deref(), Some(event_id_0));
3797 }
3798 );
3799
3800 assert_matches!(
3803 room_event_cache
3804 .rfind_map_event_in_memory_by(|event| {
3805 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
3806 })
3807 .await,
3808 Ok(Some(event_id)) => {
3809 assert_eq!(event_id.as_deref(), Some(event_id_2));
3810 }
3811 );
3812
3813 assert!(
3815 room_event_cache
3816 .rfind_map_event_in_memory_by(|event| {
3817 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3818 == Some(user_id))
3819 .then(|| event.event_id())
3820 })
3821 .await
3822 .unwrap()
3823 .is_none()
3824 );
3825
3826 assert!(
3828 room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
3829 );
3830 }
3831
3832 #[async_test]
3833 async fn test_reload_when_dirty() {
3834 let user_id = user_id!("@mnt_io:matrix.org");
3835 let room_id = room_id!("!raclette:patate.ch");
3836
3837 let event_cache_store = MemoryStore::new();
3839
3840 let client_p0 = MockClientBuilder::new(None)
3842 .on_builder(|builder| {
3843 builder.store_config(
3844 StoreConfig::new("process #0".to_owned())
3845 .event_cache_store(event_cache_store.clone()),
3846 )
3847 })
3848 .build()
3849 .await;
3850
3851 let client_p1 = MockClientBuilder::new(None)
3853 .on_builder(|builder| {
3854 builder.store_config(
3855 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
3856 )
3857 })
3858 .build()
3859 .await;
3860
3861 let event_factory = EventFactory::new().room(room_id).sender(user_id);
3862
3863 let ev_id_0 = event_id!("$ev_0");
3864 let ev_id_1 = event_id!("$ev_1");
3865
3866 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
3867 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
3868
3869 client_p0
3871 .event_cache_store()
3872 .lock()
3873 .await
3874 .expect("[p0] Could not acquire the event cache lock")
3875 .as_clean()
3876 .expect("[p0] Could not acquire a clean event cache lock")
3877 .handle_linked_chunk_updates(
3878 LinkedChunkId::Room(room_id),
3879 vec![
3880 Update::NewItemsChunk {
3881 previous: None,
3882 new: ChunkIdentifier::new(0),
3883 next: None,
3884 },
3885 Update::PushItems {
3886 at: Position::new(ChunkIdentifier::new(0), 0),
3887 items: vec![ev_0],
3888 },
3889 Update::NewItemsChunk {
3890 previous: Some(ChunkIdentifier::new(0)),
3891 new: ChunkIdentifier::new(1),
3892 next: None,
3893 },
3894 Update::PushItems {
3895 at: Position::new(ChunkIdentifier::new(1), 0),
3896 items: vec![ev_1],
3897 },
3898 ],
3899 )
3900 .await
3901 .unwrap();
3902
3903 let (room_event_cache_p0, room_event_cache_p1) = {
3905 let event_cache_p0 = client_p0.event_cache();
3906 event_cache_p0.subscribe().unwrap();
3907
3908 let event_cache_p1 = client_p1.event_cache();
3909 event_cache_p1.subscribe().unwrap();
3910
3911 client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3912 client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3913
3914 let (room_event_cache_p0, _drop_handles) =
3915 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
3916 let (room_event_cache_p1, _drop_handles) =
3917 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
3918
3919 (room_event_cache_p0, room_event_cache_p1)
3920 };
3921
3922 let mut updates_stream_p0 = {
3927 let room_event_cache = &room_event_cache_p0;
3928
3929 let (initial_updates, mut updates_stream) =
3930 room_event_cache_p0.subscribe().await.unwrap();
3931
3932 assert_eq!(initial_updates.len(), 1);
3934 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
3935 assert!(updates_stream.is_empty());
3936
3937 assert!(event_loaded(room_event_cache, ev_id_1).await);
3939
3940 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
3942
3943 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
3945
3946 assert_matches!(
3948 updates_stream.recv().await.unwrap(),
3949 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
3950 assert_eq!(diffs.len(), 1, "{diffs:#?}");
3951 assert_matches!(
3952 &diffs[0],
3953 VectorDiff::Insert { index: 0, value: event } => {
3954 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
3955 }
3956 );
3957 }
3958 );
3959
3960 assert!(event_loaded(room_event_cache, ev_id_0).await);
3962
3963 updates_stream
3964 };
3965
3966 let mut updates_stream_p1 = {
3968 let room_event_cache = &room_event_cache_p1;
3969 let (initial_updates, mut updates_stream) =
3970 room_event_cache_p1.subscribe().await.unwrap();
3971
3972 assert_eq!(initial_updates.len(), 1);
3974 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
3975 assert!(updates_stream.is_empty());
3976
3977 assert!(event_loaded(room_event_cache, ev_id_1).await);
3979
3980 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
3982
3983 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
3985
3986 assert_matches!(
3988 updates_stream.recv().await.unwrap(),
3989 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
3990 assert_eq!(diffs.len(), 1, "{diffs:#?}");
3991 assert_matches!(
3992 &diffs[0],
3993 VectorDiff::Insert { index: 0, value: event } => {
3994 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
3995 }
3996 );
3997 }
3998 );
3999
4000 assert!(event_loaded(room_event_cache, ev_id_0).await);
4002
4003 updates_stream
4004 };
4005
4006 for _ in 0..3 {
4008 {
4012 let room_event_cache = &room_event_cache_p0;
4013 let updates_stream = &mut updates_stream_p0;
4014
4015 assert!(event_loaded(room_event_cache, ev_id_1).await);
4017
4018 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4021
4022 assert_matches!(
4024 updates_stream.recv().await.unwrap(),
4025 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4026 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4027 assert_matches!(&diffs[0], VectorDiff::Clear);
4028 assert_matches!(
4029 &diffs[1],
4030 VectorDiff::Append { values: events } => {
4031 assert_eq!(events.len(), 1);
4032 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4033 }
4034 );
4035 }
4036 );
4037
4038 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4040
4041 assert!(event_loaded(room_event_cache, ev_id_0).await);
4043
4044 assert_matches!(
4046 updates_stream.recv().await.unwrap(),
4047 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4048 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4049 assert_matches!(
4050 &diffs[0],
4051 VectorDiff::Insert { index: 0, value: event } => {
4052 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4053 }
4054 );
4055 }
4056 );
4057 }
4058
4059 {
4063 let room_event_cache = &room_event_cache_p1;
4064 let updates_stream = &mut updates_stream_p1;
4065
4066 assert!(event_loaded(room_event_cache, ev_id_1).await);
4068
4069 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4072
4073 assert_matches!(
4075 updates_stream.recv().await.unwrap(),
4076 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4077 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4078 assert_matches!(&diffs[0], VectorDiff::Clear);
4079 assert_matches!(
4080 &diffs[1],
4081 VectorDiff::Append { values: events } => {
4082 assert_eq!(events.len(), 1);
4083 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4084 }
4085 );
4086 }
4087 );
4088
4089 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4091
4092 assert!(event_loaded(room_event_cache, ev_id_0).await);
4094
4095 assert_matches!(
4097 updates_stream.recv().await.unwrap(),
4098 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4099 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4100 assert_matches!(
4101 &diffs[0],
4102 VectorDiff::Insert { index: 0, value: event } => {
4103 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4104 }
4105 );
4106 }
4107 );
4108 }
4109 }
4110
4111 for _ in 0..3 {
4114 {
4115 let room_event_cache = &room_event_cache_p0;
4116 let updates_stream = &mut updates_stream_p0;
4117
4118 let guard = room_event_cache.inner.state.read().await.unwrap();
4119
4120 assert!(guard.is_dirty().not());
4126
4127 assert_matches!(
4129 updates_stream.recv().await.unwrap(),
4130 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4131 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4132 assert_matches!(&diffs[0], VectorDiff::Clear);
4133 assert_matches!(
4134 &diffs[1],
4135 VectorDiff::Append { values: events } => {
4136 assert_eq!(events.len(), 1);
4137 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4138 }
4139 );
4140 }
4141 );
4142
4143 assert!(event_loaded(room_event_cache, ev_id_1).await);
4144 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4145
4146 drop(guard);
4152
4153 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4154 assert!(event_loaded(room_event_cache, ev_id_0).await);
4155
4156 assert_matches!(
4158 updates_stream.recv().await.unwrap(),
4159 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4160 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4161 assert_matches!(
4162 &diffs[0],
4163 VectorDiff::Insert { index: 0, value: event } => {
4164 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4165 }
4166 );
4167 }
4168 );
4169 }
4170
4171 {
4172 let room_event_cache = &room_event_cache_p1;
4173 let updates_stream = &mut updates_stream_p1;
4174
4175 let guard = room_event_cache.inner.state.read().await.unwrap();
4176
4177 assert!(guard.is_dirty().not());
4182
4183 assert_matches!(
4185 updates_stream.recv().await.unwrap(),
4186 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4187 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4188 assert_matches!(&diffs[0], VectorDiff::Clear);
4189 assert_matches!(
4190 &diffs[1],
4191 VectorDiff::Append { values: events } => {
4192 assert_eq!(events.len(), 1);
4193 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4194 }
4195 );
4196 }
4197 );
4198
4199 assert!(event_loaded(room_event_cache, ev_id_1).await);
4200 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4201
4202 drop(guard);
4208
4209 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4210 assert!(event_loaded(room_event_cache, ev_id_0).await);
4211
4212 assert_matches!(
4214 updates_stream.recv().await.unwrap(),
4215 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4216 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4217 assert_matches!(
4218 &diffs[0],
4219 VectorDiff::Insert { index: 0, value: event } => {
4220 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4221 }
4222 );
4223 }
4224 );
4225 }
4226 }
4227
4228 for _ in 0..3 {
4230 {
4231 let room_event_cache = &room_event_cache_p0;
4232 let updates_stream = &mut updates_stream_p0;
4233
4234 let guard = room_event_cache.inner.state.write().await.unwrap();
4235
4236 assert!(guard.is_dirty().not());
4238
4239 assert_matches!(
4241 updates_stream.recv().await.unwrap(),
4242 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4243 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4244 assert_matches!(&diffs[0], VectorDiff::Clear);
4245 assert_matches!(
4246 &diffs[1],
4247 VectorDiff::Append { values: events } => {
4248 assert_eq!(events.len(), 1);
4249 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4250 }
4251 );
4252 }
4253 );
4254
4255 drop(guard);
4258
4259 assert!(event_loaded(room_event_cache, ev_id_1).await);
4260 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4261
4262 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4263 assert!(event_loaded(room_event_cache, ev_id_0).await);
4264
4265 assert_matches!(
4267 updates_stream.recv().await.unwrap(),
4268 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4269 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4270 assert_matches!(
4271 &diffs[0],
4272 VectorDiff::Insert { index: 0, value: event } => {
4273 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4274 }
4275 );
4276 }
4277 );
4278 }
4279
4280 {
4281 let room_event_cache = &room_event_cache_p1;
4282 let updates_stream = &mut updates_stream_p1;
4283
4284 let guard = room_event_cache.inner.state.write().await.unwrap();
4285
4286 assert!(guard.is_dirty().not());
4288
4289 assert_matches!(
4291 updates_stream.recv().await.unwrap(),
4292 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4293 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4294 assert_matches!(&diffs[0], VectorDiff::Clear);
4295 assert_matches!(
4296 &diffs[1],
4297 VectorDiff::Append { values: events } => {
4298 assert_eq!(events.len(), 1);
4299 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4300 }
4301 );
4302 }
4303 );
4304
4305 drop(guard);
4308
4309 assert!(event_loaded(room_event_cache, ev_id_1).await);
4310 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4311
4312 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4313 assert!(event_loaded(room_event_cache, ev_id_0).await);
4314
4315 assert_matches!(
4317 updates_stream.recv().await.unwrap(),
4318 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4319 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4320 assert_matches!(
4321 &diffs[0],
4322 VectorDiff::Insert { index: 0, value: event } => {
4323 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4324 }
4325 );
4326 }
4327 );
4328 }
4329 }
4330 }
4331
4332 #[async_test]
4333 async fn test_load_when_dirty() {
4334 let room_id_0 = room_id!("!raclette:patate.ch");
4335 let room_id_1 = room_id!("!morbiflette:patate.ch");
4336
4337 let event_cache_store = MemoryStore::new();
4339
4340 let client_p0 = MockClientBuilder::new(None)
4342 .on_builder(|builder| {
4343 builder.store_config(
4344 StoreConfig::new("process #0".to_owned())
4345 .event_cache_store(event_cache_store.clone()),
4346 )
4347 })
4348 .build()
4349 .await;
4350
4351 let client_p1 = MockClientBuilder::new(None)
4353 .on_builder(|builder| {
4354 builder.store_config(
4355 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4356 )
4357 })
4358 .build()
4359 .await;
4360
4361 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
4363 let event_cache_p0 = client_p0.event_cache();
4364 event_cache_p0.subscribe().unwrap();
4365
4366 let event_cache_p1 = client_p1.event_cache();
4367 event_cache_p1.subscribe().unwrap();
4368
4369 client_p0
4370 .base_client()
4371 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4372 client_p0
4373 .base_client()
4374 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4375
4376 client_p1
4377 .base_client()
4378 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4379 client_p1
4380 .base_client()
4381 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4382
4383 let (room_event_cache_0_p0, _drop_handles) =
4384 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4385 let (room_event_cache_0_p1, _drop_handles) =
4386 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4387
4388 (room_event_cache_0_p0, room_event_cache_0_p1)
4389 };
4390
4391 {
4393 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
4394 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
4395 }
4396
4397 let (room_event_cache_1_p0, _) =
4401 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
4402
4403 {
4405 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
4406 assert!(guard.is_dirty().not());
4407 }
4408
4409 }
4412
4413 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
4414 room_event_cache
4415 .rfind_map_event_in_memory_by(|event| {
4416 (event.event_id().as_deref() == Some(event_id)).then_some(())
4417 })
4418 .await
4419 .unwrap()
4420 .is_some()
4421 }
4422}