1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 Arc,
23 atomic::{AtomicUsize, Ordering},
24 },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::AmbiguityChange,
32 event_cache::Event,
33 linked_chunk::Position,
34 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37 EventId, OwnedEventId, OwnedRoomId, RoomId,
38 api::Direction,
39 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40 serde::Raw,
41};
42use tokio::sync::{
43 Notify,
44 broadcast::{Receiver, Sender},
45 mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50 AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51 RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54 client::WeakClient,
55 event_cache::EventCacheError,
56 room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64#[derive(Clone)]
68pub struct RoomEventCache {
69 pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("RoomEventCache").finish_non_exhaustive()
75 }
76}
77
78#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89 recv: Receiver<RoomEventCacheUpdate>,
91
92 room_id: OwnedRoomId,
94
95 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98 subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103 fn drop(&mut self) {
104 let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106 trace!(
107 "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108 );
109
110 if previous_subscriber_count == 1 {
111 let mut room_id = self.room_id.clone();
115
116 let mut num_attempts = 0;
122
123 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124 num_attempts += 1;
125
126 if num_attempts > 1024 {
127 warn!(
130 "couldn't send notification to the auto-shrink channel \
131 after 1024 attempts; giving up"
132 );
133 return;
134 }
135
136 match err {
137 mpsc::error::TrySendError::Full(stolen_room_id) => {
138 room_id = stolen_room_id;
139 }
140 mpsc::error::TrySendError::Closed(_) => return,
141 }
142 }
143
144 trace!("sent notification to the parent channel that we were the last subscriber");
145 }
146 }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150 type Target = Receiver<RoomEventCacheUpdate>;
151
152 fn deref(&self) -> &Self::Target {
153 &self.recv
154 }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158 fn deref_mut(&mut self) -> &mut Self::Target {
159 &mut self.recv
160 }
161}
162
163impl RoomEventCache {
164 pub(super) fn new(
166 client: WeakClient,
167 state: RoomEventCacheStateLock,
168 pagination_status: SharedObservable<RoomPaginationStatus>,
169 room_id: OwnedRoomId,
170 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171 update_sender: Sender<RoomEventCacheUpdate>,
172 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
173 ) -> Self {
174 Self {
175 inner: Arc::new(RoomEventCacheInner::new(
176 client,
177 state,
178 pagination_status,
179 room_id,
180 auto_shrink_sender,
181 update_sender,
182 generic_update_sender,
183 )),
184 }
185 }
186
187 pub fn room_id(&self) -> &RoomId {
189 &self.inner.room_id
190 }
191
192 pub async fn events(&self) -> Result<Vec<Event>> {
197 let state = self.inner.state.read().await?;
198
199 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
200 }
201
202 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
209 let state = self.inner.state.read().await?;
210 let events =
211 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
212
213 let subscriber_count = state.subscriber_count();
214 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
215 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
216
217 let recv = self.inner.update_sender.subscribe();
218 let subscriber = RoomEventCacheSubscriber {
219 recv,
220 room_id: self.inner.room_id.clone(),
221 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
222 subscriber_count: subscriber_count.clone(),
223 };
224
225 Ok((events, subscriber))
226 }
227
228 pub async fn subscribe_to_thread(
231 &self,
232 thread_root: OwnedEventId,
233 ) -> Result<(Vec<Event>, Receiver<ThreadEventCacheUpdate>)> {
234 let mut state = self.inner.state.write().await?;
235 Ok(state.subscribe_to_thread(thread_root))
236 }
237
238 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
243 pub async fn paginate_thread_backwards(
244 &self,
245 thread_root: OwnedEventId,
246 num_events: u16,
247 ) -> Result<bool> {
248 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
249
250 let mut outcome =
252 self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone());
253
254 loop {
255 match outcome {
256 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
257 let options = RelationsOptions {
259 from: prev_token.clone(),
260 dir: Direction::Backward,
261 limit: Some(num_events.into()),
262 include_relations: IncludeRelations::AllRelations,
263 recurse: true,
264 };
265
266 let mut result = room
267 .relations(thread_root.clone(), options)
268 .await
269 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
270
271 let reached_start = result.next_batch_token.is_none();
272 trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
273
274 let root_event =
277 if reached_start {
278 Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
280 |err| EventCacheError::BackpaginationError(Box::new(err)),
281 )?)
282 } else {
283 None
284 };
285
286 let mut state = self.inner.state.write().await?;
287
288 state.save_events(result.chunk.iter().cloned()).await?;
290
291 result.chunk.extend(root_event);
294
295 if let Some(outcome) = state.finish_thread_network_pagination(
296 thread_root.clone(),
297 prev_token,
298 result.next_batch_token,
299 result.chunk,
300 ) {
301 return Ok(outcome.reached_start);
302 }
303
304 outcome = state.load_more_thread_events_backwards(thread_root.clone());
306 }
307
308 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
309 return Ok(true);
311 }
312
313 LoadMoreEventsBackwardsOutcome::Events { .. } => {
314 unimplemented!("loading from disk for threads is not implemented yet");
316 }
317 }
318 }
319 }
320
321 pub fn pagination(&self) -> RoomPagination {
324 RoomPagination { inner: self.inner.clone() }
325 }
326
327 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
336 where
337 P: FnMut(&Event, Option<&Event>) -> Option<O>,
338 {
339 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
340 }
341
342 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
347 Ok(self
348 .inner
349 .state
350 .read()
351 .await?
352 .find_event(event_id)
353 .await
354 .ok()
355 .flatten()
356 .map(|(_loc, event)| event))
357 }
358
359 pub async fn find_event_with_relations(
371 &self,
372 event_id: &EventId,
373 filter: Option<Vec<RelationType>>,
374 ) -> Result<Option<(Event, Vec<Event>)>> {
375 Ok(self
377 .inner
378 .state
379 .read()
380 .await?
381 .find_event_with_relations(event_id, filter.clone())
382 .await
383 .ok()
384 .flatten())
385 }
386
387 pub async fn find_event_relations(
399 &self,
400 event_id: &EventId,
401 filter: Option<Vec<RelationType>>,
402 ) -> Result<Vec<Event>> {
403 self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
405 }
406
407 pub async fn clear(&self) -> Result<()> {
412 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
414
415 let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
417 diffs: updates_as_vector_diffs,
418 origin: EventsOrigin::Cache,
419 });
420
421 let _ = self
423 .inner
424 .generic_update_sender
425 .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
426
427 Ok(())
428 }
429
430 pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
432 self.inner
433 .handle_timeline(
434 Timeline { limited: false, prev_batch: None, events: vec![event] },
435 Vec::new(),
436 BTreeMap::new(),
437 )
438 .await
439 }
440
441 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
444 match self.inner.state.write().await {
445 Ok(mut state_guard) => {
446 if let Err(err) = state_guard.save_events(events).await {
447 warn!("couldn't save event in the event cache: {err}");
448 }
449 }
450
451 Err(err) => {
452 warn!("couldn't save event in the event cache: {err}");
453 }
454 }
455 }
456
457 pub async fn debug_string(&self) -> Vec<String> {
460 match self.inner.state.read().await {
461 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
462 Err(err) => {
463 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
464
465 vec![]
466 }
467 }
468 }
469}
470
471pub(super) struct RoomEventCacheInner {
473 pub(super) room_id: OwnedRoomId,
475
476 pub weak_room: WeakRoom,
477
478 pub state: RoomEventCacheStateLock,
480
481 pub pagination_batch_token_notifier: Notify,
483
484 pub pagination_status: SharedObservable<RoomPaginationStatus>,
485
486 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
491
492 pub update_sender: Sender<RoomEventCacheUpdate>,
494
495 pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
501}
502
503impl RoomEventCacheInner {
504 fn new(
507 client: WeakClient,
508 state: RoomEventCacheStateLock,
509 pagination_status: SharedObservable<RoomPaginationStatus>,
510 room_id: OwnedRoomId,
511 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
512 update_sender: Sender<RoomEventCacheUpdate>,
513 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
514 ) -> Self {
515 let weak_room = WeakRoom::new(client, room_id);
516
517 Self {
518 room_id: weak_room.room_id().to_owned(),
519 weak_room,
520 state,
521 update_sender,
522 pagination_batch_token_notifier: Default::default(),
523 auto_shrink_sender,
524 pagination_status,
525 generic_update_sender,
526 }
527 }
528
529 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
530 if account_data.is_empty() {
531 return;
532 }
533
534 let mut handled_read_marker = false;
535
536 trace!("Handling account data");
537
538 for raw_event in account_data {
539 match raw_event.deserialize() {
540 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
541 if handled_read_marker {
544 continue;
545 }
546
547 handled_read_marker = true;
548
549 let _ = self.update_sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
551 event_id: ev.content.event_id,
552 });
553 }
554
555 Ok(_) => {
556 }
559
560 Err(e) => {
561 let event_type = raw_event.get_field::<String>("type").ok().flatten();
562 warn!(event_type, "Failed to deserialize account data: {e}");
563 }
564 }
565 }
566 }
567
568 #[instrument(skip_all, fields(room_id = %self.room_id))]
569 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
570 self.handle_timeline(
571 updates.timeline,
572 updates.ephemeral.clone(),
573 updates.ambiguity_changes,
574 )
575 .await?;
576 self.handle_account_data(updates.account_data);
577
578 Ok(())
579 }
580
581 #[instrument(skip_all, fields(room_id = %self.room_id))]
582 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
583 self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
584
585 Ok(())
586 }
587
588 async fn handle_timeline(
591 &self,
592 timeline: Timeline,
593 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
594 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
595 ) -> Result<()> {
596 if timeline.events.is_empty()
597 && timeline.prev_batch.is_none()
598 && ephemeral_events.is_empty()
599 && ambiguity_changes.is_empty()
600 {
601 return Ok(());
602 }
603
604 trace!("adding new events");
606
607 let (stored_prev_batch_token, timeline_event_diffs) =
608 self.state.write().await?.handle_sync(timeline).await?;
609
610 if stored_prev_batch_token {
613 self.pagination_batch_token_notifier.notify_one();
614 }
615
616 if !timeline_event_diffs.is_empty() {
619 let _ = self.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
620 diffs: timeline_event_diffs,
621 origin: EventsOrigin::Sync,
622 });
623
624 let _ = self
625 .generic_update_sender
626 .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
627 }
628
629 if !ephemeral_events.is_empty() {
630 let _ = self
631 .update_sender
632 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
633 }
634
635 if !ambiguity_changes.is_empty() {
636 let _ =
637 self.update_sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
638 }
639
640 Ok(())
641 }
642}
643
644#[derive(Debug)]
647pub(super) enum LoadMoreEventsBackwardsOutcome {
648 Gap {
650 prev_token: Option<String>,
653 },
654
655 StartOfTimeline,
657
658 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
660}
661
662mod private {
664 use std::{
665 collections::{BTreeMap, HashMap, HashSet},
666 sync::{
667 Arc,
668 atomic::{AtomicBool, AtomicUsize, Ordering},
669 },
670 };
671
672 use eyeball::SharedObservable;
673 use eyeball_im::VectorDiff;
674 use itertools::Itertools;
675 use matrix_sdk_base::{
676 apply_redaction,
677 deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
678 event_cache::{
679 Event, Gap,
680 store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
681 },
682 linked_chunk::{
683 ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
684 OwnedLinkedChunkId, Position, Update, lazy_loader,
685 },
686 serde_helpers::{extract_edit_target, extract_thread_root},
687 sync::Timeline,
688 };
689 use matrix_sdk_common::executor::spawn;
690 use ruma::{
691 EventId, OwnedEventId, OwnedRoomId, RoomId,
692 events::{
693 AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
694 relation::RelationType, room::redaction::SyncRoomRedactionEvent,
695 },
696 room_version_rules::RoomVersionRules,
697 serde::Raw,
698 };
699 use tokio::sync::{
700 Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
701 broadcast::{Receiver, Sender},
702 };
703 use tracing::{debug, error, instrument, trace, warn};
704
705 use super::{
706 super::{
707 BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate,
708 RoomPaginationStatus, ThreadEventCacheUpdate,
709 deduplicator::{DeduplicationOutcome, filter_duplicate_events},
710 room::threads::ThreadEventCache,
711 },
712 EventLocation, EventsOrigin, LoadMoreEventsBackwardsOutcome, RoomEventCacheGenericUpdate,
713 RoomEventCacheUpdate,
714 events::EventLinkedChunk,
715 sort_positions_descending,
716 };
717
718 pub struct RoomEventCacheStateLock {
723 locked_state: RwLock<RoomEventCacheStateLockInner>,
725
726 read_lock_acquisition: Mutex<()>,
729 }
730
731 struct RoomEventCacheStateLockInner {
732 enabled_thread_support: bool,
734
735 room_id: OwnedRoomId,
737
738 store: EventCacheStoreLock,
740
741 room_linked_chunk: EventLinkedChunk,
744
745 threads: HashMap<OwnedEventId, ThreadEventCache>,
749
750 pagination_status: SharedObservable<RoomPaginationStatus>,
751
752 update_sender: Sender<RoomEventCacheUpdate>,
757
758 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
763
764 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
767
768 room_version_rules: RoomVersionRules,
770
771 waited_for_initial_prev_token: Arc<AtomicBool>,
776
777 subscriber_count: Arc<AtomicUsize>,
780 }
781
782 impl RoomEventCacheStateLock {
783 #[allow(clippy::too_many_arguments)]
794 pub async fn new(
795 room_id: OwnedRoomId,
796 room_version_rules: RoomVersionRules,
797 enabled_thread_support: bool,
798 update_sender: Sender<RoomEventCacheUpdate>,
799 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
800 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
801 store: EventCacheStoreLock,
802 pagination_status: SharedObservable<RoomPaginationStatus>,
803 ) -> Result<Self, EventCacheError> {
804 let store_guard = match store.lock().await? {
805 EventCacheStoreLockState::Clean(guard) => guard,
807
808 EventCacheStoreLockState::Dirty(guard) => {
811 EventCacheStoreLockGuard::clear_dirty(&guard);
812
813 guard
814 }
815 };
816
817 let linked_chunk_id = LinkedChunkId::Room(&room_id);
818
819 let full_linked_chunk_metadata =
824 match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
825 Ok(metas) => metas,
826 Err(err) => {
827 error!(
828 "error when loading a linked chunk's metadata from the store: {err}"
829 );
830
831 store_guard
833 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
834 .await?;
835
836 None
838 }
839 };
840
841 let linked_chunk = match store_guard
842 .load_last_chunk(linked_chunk_id)
843 .await
844 .map_err(EventCacheError::from)
845 .and_then(|(last_chunk, chunk_identifier_generator)| {
846 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
847 .map_err(EventCacheError::from)
848 }) {
849 Ok(linked_chunk) => linked_chunk,
850 Err(err) => {
851 error!(
852 "error when loading a linked chunk's latest chunk from the store: {err}"
853 );
854
855 store_guard
857 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
858 .await?;
859
860 None
861 }
862 };
863
864 let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false));
865
866 Ok(Self {
867 locked_state: RwLock::new(RoomEventCacheStateLockInner {
868 enabled_thread_support,
869 room_id,
870 store,
871 room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
872 linked_chunk,
873 full_linked_chunk_metadata,
874 ),
875 threads: HashMap::new(),
880 pagination_status,
881 update_sender,
882 generic_update_sender,
883 linked_chunk_update_sender,
884 room_version_rules,
885 waited_for_initial_prev_token,
886 subscriber_count: Default::default(),
887 }),
888 read_lock_acquisition: Mutex::new(()),
889 })
890 }
891
892 pub async fn read(&self) -> Result<RoomEventCacheStateLockReadGuard<'_>, EventCacheError> {
902 let _one_reader_guard = self.read_lock_acquisition.lock().await;
949
950 let state_guard = self.locked_state.read().await;
952
953 match state_guard.store.lock().await? {
954 EventCacheStoreLockState::Clean(store_guard) => {
955 Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard })
956 }
957 EventCacheStoreLockState::Dirty(store_guard) => {
958 drop(state_guard);
962 let state_guard = self.locked_state.write().await;
963
964 let mut guard = RoomEventCacheStateLockWriteGuard {
965 state: state_guard,
966 store: store_guard,
967 };
968
969 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
971
972 EventCacheStoreLockGuard::clear_dirty(&guard.store);
974
975 let guard = guard.downgrade();
977
978 if !updates_as_vector_diffs.is_empty() {
980 let _ = guard.state.update_sender.send(
982 RoomEventCacheUpdate::UpdateTimelineEvents {
983 diffs: updates_as_vector_diffs,
984 origin: EventsOrigin::Cache,
985 },
986 );
987
988 let _ =
990 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
991 room_id: guard.state.room_id.clone(),
992 });
993 }
994
995 Ok(guard)
996 }
997 }
998 }
999
1000 pub async fn write(
1011 &self,
1012 ) -> Result<RoomEventCacheStateLockWriteGuard<'_>, EventCacheError> {
1013 let state_guard = self.locked_state.write().await;
1014
1015 match state_guard.store.lock().await? {
1016 EventCacheStoreLockState::Clean(store_guard) => {
1017 Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard })
1018 }
1019 EventCacheStoreLockState::Dirty(store_guard) => {
1020 let mut guard = RoomEventCacheStateLockWriteGuard {
1021 state: state_guard,
1022 store: store_guard,
1023 };
1024
1025 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
1027
1028 EventCacheStoreLockGuard::clear_dirty(&guard.store);
1030
1031 if !updates_as_vector_diffs.is_empty() {
1033 let _ = guard.state.update_sender.send(
1035 RoomEventCacheUpdate::UpdateTimelineEvents {
1036 diffs: updates_as_vector_diffs,
1037 origin: EventsOrigin::Cache,
1038 },
1039 );
1040
1041 let _ =
1043 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1044 room_id: guard.state.room_id.clone(),
1045 });
1046 }
1047
1048 Ok(guard)
1049 }
1050 }
1051 }
1052 }
1053
1054 pub struct RoomEventCacheStateLockReadGuard<'a> {
1056 state: RwLockReadGuard<'a, RoomEventCacheStateLockInner>,
1059
1060 store: EventCacheStoreLockGuard,
1062 }
1063
1064 pub struct RoomEventCacheStateLockWriteGuard<'a> {
1066 state: RwLockWriteGuard<'a, RoomEventCacheStateLockInner>,
1069
1070 store: EventCacheStoreLockGuard,
1072 }
1073
1074 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1075 fn downgrade(self) -> RoomEventCacheStateLockReadGuard<'a> {
1083 RoomEventCacheStateLockReadGuard { state: self.state.downgrade(), store: self.store }
1084 }
1085 }
1086
1087 impl<'a> RoomEventCacheStateLockReadGuard<'a> {
1088 pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1090 &self.state.room_linked_chunk
1091 }
1092
1093 pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
1094 &self.state.subscriber_count
1095 }
1096
1097 pub async fn find_event(
1102 &self,
1103 event_id: &EventId,
1104 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1105 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1106 .await
1107 }
1108
1109 pub async fn find_event_with_relations(
1123 &self,
1124 event_id: &EventId,
1125 filters: Option<Vec<RelationType>>,
1126 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1127 find_event_with_relations(
1128 event_id,
1129 &self.state.room_id,
1130 filters,
1131 &self.state.room_linked_chunk,
1132 &self.store,
1133 )
1134 .await
1135 }
1136
1137 pub async fn find_event_relations(
1151 &self,
1152 event_id: &EventId,
1153 filters: Option<Vec<RelationType>>,
1154 ) -> Result<Vec<Event>, EventCacheError> {
1155 find_event_relations(
1156 event_id,
1157 &self.state.room_id,
1158 filters,
1159 &self.state.room_linked_chunk,
1160 &self.store,
1161 )
1162 .await
1163 }
1164
1165 pub fn rfind_map_event_in_memory_by<'i, O, P>(&'i self, mut predicate: P) -> Option<O>
1174 where
1175 P: FnMut(&'i Event, Option<&'i Event>) -> Option<O>,
1176 {
1177 self.state
1178 .room_linked_chunk
1179 .revents()
1180 .peekable()
1181 .batching(|iter| {
1182 iter.next().map(|(_position, event)| {
1183 (event, iter.peek().map(|(_next_position, next_event)| *next_event))
1184 })
1185 })
1186 .find_map(|(event, next_event)| predicate(event, next_event))
1187 }
1188
1189 #[cfg(test)]
1190 pub fn is_dirty(&self) -> bool {
1191 EventCacheStoreLockGuard::is_dirty(&self.store)
1192 }
1193 }
1194
1195 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1196 #[cfg(any(feature = "e2e-encryption", test))]
1198 pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk {
1199 &mut self.state.room_linked_chunk
1200 }
1201
1202 pub fn waited_for_initial_prev_token(&self) -> &Arc<AtomicBool> {
1204 &self.state.waited_for_initial_prev_token
1205 }
1206
1207 pub async fn find_event(
1212 &self,
1213 event_id: &EventId,
1214 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1215 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1216 .await
1217 }
1218
1219 pub async fn find_event_with_relations(
1233 &self,
1234 event_id: &EventId,
1235 filters: Option<Vec<RelationType>>,
1236 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1237 find_event_with_relations(
1238 event_id,
1239 &self.state.room_id,
1240 filters,
1241 &self.state.room_linked_chunk,
1242 &self.store,
1243 )
1244 .await
1245 }
1246
1247 pub async fn load_more_events_backwards(
1249 &mut self,
1250 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
1251 if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token)
1254 {
1255 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
1256 }
1257
1258 let prev_first_chunk = self
1259 .state
1260 .room_linked_chunk
1261 .chunks()
1262 .next()
1263 .expect("a linked chunk is never empty");
1264
1265 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1267 let new_first_chunk = match self
1268 .store
1269 .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
1270 .await
1271 {
1272 Ok(Some(new_first_chunk)) => {
1273 new_first_chunk
1275 }
1276
1277 Ok(None) => {
1278 if self.state.room_linked_chunk.events().next().is_some() {
1283 trace!("chunk is fully loaded and non-empty: reached_start=true");
1286 return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
1287 }
1288
1289 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
1291 }
1292
1293 Err(err) => {
1294 error!("error when loading the previous chunk of a linked chunk: {err}");
1295
1296 self.store
1298 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1299 .await?;
1300
1301 return Err(err.into());
1303 }
1304 };
1305
1306 let chunk_content = new_first_chunk.content.clone();
1307
1308 let reached_start = new_first_chunk.previous.is_none();
1314
1315 if let Err(err) =
1316 self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk)
1317 {
1318 error!("error when inserting the previous chunk into its linked chunk: {err}");
1319
1320 self.store
1322 .handle_linked_chunk_updates(
1323 LinkedChunkId::Room(&self.state.room_id),
1324 vec![Update::Clear],
1325 )
1326 .await?;
1327
1328 return Err(err.into());
1330 }
1331
1332 let _ = self.state.room_linked_chunk.store_updates().take();
1335
1336 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1338
1339 Ok(match chunk_content {
1340 ChunkContent::Gap(gap) => {
1341 trace!("reloaded chunk from disk (gap)");
1342 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1343 }
1344
1345 ChunkContent::Items(events) => {
1346 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1347 LoadMoreEventsBackwardsOutcome::Events {
1348 events,
1349 timeline_event_diffs,
1350 reached_start,
1351 }
1352 }
1353 })
1354 }
1355
1356 pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1365 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1367 let (last_chunk, chunk_identifier_generator) =
1368 match self.store.load_last_chunk(linked_chunk_id).await {
1369 Ok(pair) => pair,
1370
1371 Err(err) => {
1372 error!("error when reloading a linked chunk from memory: {err}");
1374
1375 self.store
1377 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1378 .await?;
1379
1380 (None, ChunkIdentifierGenerator::new_from_scratch())
1382 }
1383 };
1384
1385 debug!("unloading the linked chunk, and resetting it to its last chunk");
1386
1387 if let Err(err) =
1390 self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1391 {
1392 error!("error when replacing the linked chunk: {err}");
1393 return self.reset_internal().await;
1394 }
1395
1396 self.state
1400 .pagination_status
1401 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1402
1403 let _ = self.state.room_linked_chunk.store_updates().take();
1406
1407 Ok(())
1408 }
1409
1410 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1413 pub async fn auto_shrink_if_no_subscribers(
1414 &mut self,
1415 ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1416 let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
1417
1418 trace!(subscriber_count, "received request to auto-shrink");
1419
1420 if subscriber_count == 0 {
1421 self.shrink_to_last_chunk().await?;
1424
1425 Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
1426 } else {
1427 Ok(None)
1428 }
1429 }
1430
1431 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1433 pub async fn force_shrink_to_last_chunk(
1434 &mut self,
1435 ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1436 self.shrink_to_last_chunk().await?;
1437
1438 Ok(self.state.room_linked_chunk.updates_as_vector_diffs())
1439 }
1440
1441 #[instrument(skip_all)]
1447 pub async fn remove_events(
1448 &mut self,
1449 in_memory_events: Vec<(OwnedEventId, Position)>,
1450 in_store_events: Vec<(OwnedEventId, Position)>,
1451 ) -> Result<(), EventCacheError> {
1452 if !in_store_events.is_empty() {
1454 let mut positions = in_store_events
1455 .into_iter()
1456 .map(|(_event_id, position)| position)
1457 .collect::<Vec<_>>();
1458
1459 sort_positions_descending(&mut positions);
1460
1461 let updates = positions
1462 .into_iter()
1463 .map(|pos| Update::RemoveItem { at: pos })
1464 .collect::<Vec<_>>();
1465
1466 self.apply_store_only_updates(updates).await?;
1467 }
1468
1469 if in_memory_events.is_empty() {
1471 return Ok(());
1473 }
1474
1475 self.state
1477 .room_linked_chunk
1478 .remove_events_by_position(
1479 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1480 )
1481 .expect("failed to remove an event");
1482
1483 self.propagate_changes().await
1484 }
1485
1486 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1487 let updates = self.state.room_linked_chunk.store_updates().take();
1488 self.send_updates_to_store(updates).await
1489 }
1490
1491 async fn apply_store_only_updates(
1498 &mut self,
1499 updates: Vec<Update<Event, Gap>>,
1500 ) -> Result<(), EventCacheError> {
1501 self.state.room_linked_chunk.order_tracker.map_updates(&updates);
1502 self.send_updates_to_store(updates).await
1503 }
1504
1505 async fn send_updates_to_store(
1506 &mut self,
1507 mut updates: Vec<Update<Event, Gap>>,
1508 ) -> Result<(), EventCacheError> {
1509 if updates.is_empty() {
1510 return Ok(());
1511 }
1512
1513 for update in updates.iter_mut() {
1515 match update {
1516 Update::PushItems { items, .. } => strip_relations_from_events(items),
1517 Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
1518 Update::NewItemsChunk { .. }
1520 | Update::NewGapChunk { .. }
1521 | Update::RemoveChunk(_)
1522 | Update::RemoveItem { .. }
1523 | Update::DetachLastItems { .. }
1524 | Update::StartReattachItems
1525 | Update::EndReattachItems
1526 | Update::Clear => {}
1527 }
1528 }
1529
1530 let store = self.store.clone();
1537 let room_id = self.state.room_id.clone();
1538 let cloned_updates = updates.clone();
1539
1540 spawn(async move {
1541 trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1542 let linked_chunk_id = LinkedChunkId::Room(&room_id);
1543 store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1544 trace!("linked chunk updates applied");
1545
1546 super::Result::Ok(())
1547 })
1548 .await
1549 .expect("joining failed")?;
1550
1551 let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1553 linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()),
1554 updates,
1555 });
1556
1557 Ok(())
1558 }
1559
1560 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1566 self.reset_internal().await?;
1567
1568 let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
1569
1570 debug_assert_eq!(diff_updates.len(), 1);
1572 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1573
1574 Ok(diff_updates)
1575 }
1576
1577 async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1578 self.state.room_linked_chunk.reset();
1579
1580 for thread in self.state.threads.values_mut() {
1585 thread.clear();
1586 }
1587
1588 self.propagate_changes().await?;
1589
1590 self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst);
1594 self.state
1596 .pagination_status
1597 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1598
1599 Ok(())
1600 }
1601
1602 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1610 pub async fn handle_sync(
1611 &mut self,
1612 mut timeline: Timeline,
1613 ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1614 let mut prev_batch = timeline.prev_batch.take();
1615
1616 let DeduplicationOutcome {
1617 all_events: events,
1618 in_memory_duplicated_event_ids,
1619 in_store_duplicated_event_ids,
1620 non_empty_all_duplicates: all_duplicates,
1621 } = filter_duplicate_events(
1622 &self.store,
1623 LinkedChunkId::Room(&self.state.room_id),
1624 &self.state.room_linked_chunk,
1625 timeline.events,
1626 )
1627 .await?;
1628
1629 if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
1642 || all_duplicates
1643 {
1644 prev_batch = None;
1645 }
1646
1647 if prev_batch.is_some() {
1648 let mut summaries_to_update = Vec::new();
1653
1654 for (thread_root, thread) in self.state.threads.iter_mut() {
1655 thread.clear();
1657
1658 summaries_to_update.push(thread_root.clone());
1659 }
1660
1661 for thread_root in summaries_to_update {
1665 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1666 else {
1667 trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1668 continue;
1669 };
1670
1671 if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1672 prev_summary.latest_reply = None;
1673
1674 target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1675
1676 self.replace_event_at(location, target_event).await?;
1677 }
1678 }
1679 }
1680
1681 if all_duplicates {
1682 return Ok((false, Vec::new()));
1685 }
1686
1687 let has_new_gap = prev_batch.is_some();
1688
1689 if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap {
1692 self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst);
1693 }
1694
1695 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1700 .await?;
1701
1702 self.state
1703 .room_linked_chunk
1704 .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1705
1706 self.post_process_new_events(events, true).await?;
1707
1708 if timeline.limited && has_new_gap {
1709 self.shrink_to_last_chunk().await?;
1716 }
1717
1718 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1719
1720 Ok((has_new_gap, timeline_event_diffs))
1721 }
1722
1723 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1731 pub async fn handle_backpagination(
1732 &mut self,
1733 events: Vec<Event>,
1734 mut new_token: Option<String>,
1735 prev_token: Option<String>,
1736 ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1737 {
1738 let prev_gap_id = if let Some(token) = prev_token {
1741 let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| {
1743 matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1744 });
1745
1746 if gap_chunk_id.is_none() {
1747 return Ok(None);
1753 }
1754
1755 gap_chunk_id
1756 } else {
1757 None
1758 };
1759
1760 let DeduplicationOutcome {
1761 all_events: mut events,
1762 in_memory_duplicated_event_ids,
1763 in_store_duplicated_event_ids,
1764 non_empty_all_duplicates: all_duplicates,
1765 } = filter_duplicate_events(
1766 &self.store,
1767 LinkedChunkId::Room(&self.state.room_id),
1768 &self.state.room_linked_chunk,
1769 events,
1770 )
1771 .await?;
1772
1773 if !all_duplicates {
1787 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1789 .await?;
1790 } else {
1791 events.clear();
1793 new_token = None;
1796 }
1797
1798 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1801
1802 let new_gap = new_token.map(|prev_token| Gap { prev_token });
1803 let reached_start = self.state.room_linked_chunk.finish_back_pagination(
1804 prev_gap_id,
1805 new_gap,
1806 &topo_ordered_events,
1807 );
1808
1809 self.post_process_new_events(topo_ordered_events, false).await?;
1811
1812 let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1813
1814 Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1815 }
1816
1817 pub fn subscribe_to_thread(
1820 &mut self,
1821 root: OwnedEventId,
1822 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1823 self.get_or_reload_thread(root).subscribe()
1824 }
1825
1826 pub fn finish_thread_network_pagination(
1830 &mut self,
1831 root: OwnedEventId,
1832 prev_token: Option<String>,
1833 new_token: Option<String>,
1834 events: Vec<Event>,
1835 ) -> Option<BackPaginationOutcome> {
1836 self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1837 }
1838
1839 pub fn load_more_thread_events_backwards(
1840 &mut self,
1841 root: OwnedEventId,
1842 ) -> LoadMoreEventsBackwardsOutcome {
1843 self.get_or_reload_thread(root).load_more_events_backwards()
1844 }
1845
1846 pub(in super::super) async fn post_process_new_events(
1855 &mut self,
1856 events: Vec<Event>,
1857 is_sync: bool,
1858 ) -> Result<(), EventCacheError> {
1859 self.propagate_changes().await?;
1861
1862 let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1863
1864 for event in events {
1865 self.maybe_apply_new_redaction(&event).await?;
1866
1867 if self.state.enabled_thread_support {
1868 if is_sync {
1873 if let Some(thread_root) = extract_thread_root(event.raw()) {
1874 new_events_by_thread
1875 .entry(thread_root)
1876 .or_default()
1877 .push(event.clone());
1878 } else if let Some(event_id) = event.event_id() {
1879 if self.state.threads.contains_key(&event_id) {
1881 new_events_by_thread
1882 .entry(event_id)
1883 .or_default()
1884 .push(event.clone());
1885 }
1886 }
1887 }
1888
1889 if let Some(edit_target) = extract_edit_target(event.raw()) {
1891 if let Some((_location, edit_target_event)) =
1893 self.find_event(&edit_target).await?
1894 && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1895 {
1896 new_events_by_thread.entry(thread_root).or_default();
1899 }
1900 }
1901 }
1902
1903 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1905 self.save_events([*bundled_thread]).await?;
1906 }
1907 }
1908
1909 if self.state.enabled_thread_support {
1910 self.update_threads(new_events_by_thread).await?;
1911 }
1912
1913 Ok(())
1914 }
1915
1916 fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1917 let room_id = self.state.room_id.clone();
1920 let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1921
1922 self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1923 ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender)
1924 })
1925 }
1926
1927 #[instrument(skip_all)]
1928 async fn update_threads(
1929 &mut self,
1930 new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1931 ) -> Result<(), EventCacheError> {
1932 for (thread_root, new_events) in new_events_by_thread {
1933 let thread_cache = self.get_or_reload_thread(thread_root.clone());
1934
1935 thread_cache.add_live_events(new_events);
1936
1937 let mut latest_event_id = thread_cache.latest_event_id();
1938
1939 if let Some(event_id) = latest_event_id.as_ref()
1942 && let Some((_, edits)) = self
1943 .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1944 .await?
1945 && let Some(latest_edit) = edits.last()
1946 {
1947 latest_event_id = latest_edit.event_id();
1948 }
1949
1950 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1951 }
1952
1953 Ok(())
1954 }
1955
1956 async fn maybe_update_thread_summary(
1958 &mut self,
1959 thread_root: OwnedEventId,
1960 latest_event_id: Option<OwnedEventId>,
1961 ) -> Result<(), EventCacheError> {
1962 let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1966 trace!(%thread_root, "thread root event is missing from the room linked chunk");
1967 return Ok(());
1968 };
1969
1970 let prev_summary = target_event.thread_summary.summary();
1971
1972 let num_replies = {
1981 let thread_replies = self
1982 .store
1983 .find_event_relations(
1984 &self.state.room_id,
1985 &thread_root,
1986 Some(&[RelationType::Thread]),
1987 )
1988 .await?;
1989 thread_replies.len().try_into().unwrap_or(u32::MAX)
1990 };
1991
1992 let new_summary = if num_replies > 0 {
1993 Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1994 } else {
1995 None
1996 };
1997
1998 if prev_summary == new_summary.as_ref() {
1999 trace!(%thread_root, "thread summary is already up-to-date");
2000 return Ok(());
2001 }
2002
2003 trace!(%thread_root, "updating thread summary: {new_summary:?}");
2005 target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
2006 self.replace_event_at(location, target_event).await
2007 }
2008
2009 pub(crate) async fn replace_event_at(
2016 &mut self,
2017 location: EventLocation,
2018 event: Event,
2019 ) -> Result<(), EventCacheError> {
2020 match location {
2021 EventLocation::Memory(position) => {
2022 self.state
2023 .room_linked_chunk
2024 .replace_event_at(position, event)
2025 .expect("should have been a valid position of an item");
2026 self.propagate_changes().await?;
2029 }
2030 EventLocation::Store => {
2031 self.save_events([event]).await?;
2032 }
2033 }
2034
2035 Ok(())
2036 }
2037
2038 #[instrument(skip_all)]
2042 async fn maybe_apply_new_redaction(
2043 &mut self,
2044 event: &Event,
2045 ) -> Result<(), EventCacheError> {
2046 let raw_event = event.raw();
2047
2048 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
2051 raw_event.get_field::<MessageLikeEventType>("type")
2052 else {
2053 return Ok(());
2054 };
2055
2056 let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
2059 redaction,
2060 ))) = raw_event.deserialize()
2061 else {
2062 return Ok(());
2063 };
2064
2065 let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
2066 warn!("missing target event id from the redaction event");
2067 return Ok(());
2068 };
2069
2070 let Some((location, mut target_event)) = self.find_event(event_id).await? else {
2072 trace!("redacted event is missing from the linked chunk");
2073 return Ok(());
2074 };
2075
2076 let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
2078 match deserialized {
2081 AnySyncTimelineEvent::MessageLike(ev) => {
2082 if ev.is_redacted() {
2083 return Ok(());
2084 }
2085 }
2086 AnySyncTimelineEvent::State(ev) => {
2087 if ev.is_redacted() {
2088 return Ok(());
2089 }
2090 }
2091 }
2092
2093 extract_thread_root(target_event.raw())
2096 } else {
2097 warn!("failed to deserialize the event to redact");
2098 None
2099 };
2100
2101 if let Some(redacted_event) = apply_redaction(
2102 target_event.raw(),
2103 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
2104 &self.state.room_version_rules.redaction,
2105 ) {
2106 target_event.replace_raw(redacted_event.cast_unchecked());
2111
2112 self.replace_event_at(location, target_event).await?;
2113
2114 if let Some(thread_root) = thread_root
2122 && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
2123 {
2124 thread_cache.remove_if_present(event_id);
2125
2126 let latest_event_id = thread_cache.latest_event_id();
2129 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2130 }
2131 }
2132
2133 Ok(())
2134 }
2135
2136 pub async fn save_events(
2138 &mut self,
2139 events: impl IntoIterator<Item = Event>,
2140 ) -> Result<(), EventCacheError> {
2141 let store = self.store.clone();
2142 let room_id = self.state.room_id.clone();
2143 let events = events.into_iter().collect::<Vec<_>>();
2144
2145 spawn(async move {
2147 for event in events {
2148 store.save_event(&room_id, event).await?;
2149 }
2150 super::Result::Ok(())
2151 })
2152 .await
2153 .expect("joining failed")?;
2154
2155 Ok(())
2156 }
2157
2158 #[cfg(test)]
2159 pub fn is_dirty(&self) -> bool {
2160 EventCacheStoreLockGuard::is_dirty(&self.store)
2161 }
2162 }
2163
2164 async fn load_linked_chunk_metadata(
2170 store_guard: &EventCacheStoreLockGuard,
2171 linked_chunk_id: LinkedChunkId<'_>,
2172 ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
2173 let mut all_chunks = store_guard
2174 .load_all_chunks_metadata(linked_chunk_id)
2175 .await
2176 .map_err(EventCacheError::from)?;
2177
2178 if all_chunks.is_empty() {
2179 return Ok(None);
2181 }
2182
2183 let chunk_map: HashMap<_, _> =
2185 all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
2186
2187 let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
2189 let Some(last) = iter.next() else {
2190 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2191 details: "no last chunk found".to_owned(),
2192 });
2193 };
2194
2195 if let Some(other_last) = iter.next() {
2197 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2198 details: format!(
2199 "chunks {} and {} both claim to be last chunks",
2200 last.identifier.index(),
2201 other_last.identifier.index()
2202 ),
2203 });
2204 }
2205
2206 let mut seen = HashSet::new();
2209 let mut current = last;
2210 loop {
2211 if !seen.insert(current.identifier) {
2213 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2214 details: format!(
2215 "cycle detected in linked chunk at {}",
2216 current.identifier.index()
2217 ),
2218 });
2219 }
2220
2221 let Some(prev_id) = current.previous else {
2222 if seen.len() != all_chunks.len() {
2224 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2225 details: format!(
2226 "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
2227 seen.len(),
2228 all_chunks.len()
2229 ),
2230 });
2231 }
2232 break;
2233 };
2234
2235 let Some(pred_meta) = chunk_map.get(&prev_id) else {
2238 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2239 details: format!(
2240 "missing predecessor {} chunk for {}",
2241 prev_id.index(),
2242 current.identifier.index()
2243 ),
2244 });
2245 };
2246
2247 if pred_meta.next != Some(current.identifier) {
2249 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2250 details: format!(
2251 "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
2252 pred_meta.identifier.index(),
2253 pred_meta.next.map(|chunk_id| chunk_id.index()),
2254 current.identifier.index()
2255 ),
2256 });
2257 }
2258
2259 current = *pred_meta;
2260 }
2261
2262 let mut current = current.identifier;
2270 for i in 0..all_chunks.len() {
2271 let j = all_chunks
2273 .iter()
2274 .rev()
2275 .position(|meta| meta.identifier == current)
2276 .map(|j| all_chunks.len() - 1 - j)
2277 .expect("the target chunk must be present in the metadata");
2278 if i != j {
2279 all_chunks.swap(i, j);
2280 }
2281 if let Some(next) = all_chunks[i].next {
2282 current = next;
2283 }
2284 }
2285
2286 Ok(Some(all_chunks))
2287 }
2288
2289 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
2293 let mut closure = || -> Option<()> {
2297 let mut val: serde_json::Value = event.deserialize_as().ok()?;
2298 let unsigned = val.get_mut("unsigned")?;
2299 let unsigned_obj = unsigned.as_object_mut()?;
2300 if unsigned_obj.remove("m.relations").is_some() {
2301 *event = Raw::new(&val).ok()?.cast_unchecked();
2302 }
2303 None
2304 };
2305 let _ = closure();
2306 }
2307
2308 fn strip_relations_from_event(ev: &mut Event) {
2309 match &mut ev.kind {
2310 TimelineEventKind::Decrypted(decrypted) => {
2311 decrypted.unsigned_encryption_info = None;
2314
2315 strip_relations_if_present(&mut decrypted.event);
2317 }
2318
2319 TimelineEventKind::UnableToDecrypt { event, .. }
2320 | TimelineEventKind::PlainText { event } => {
2321 strip_relations_if_present(event);
2322 }
2323 }
2324 }
2325
2326 fn strip_relations_from_events(items: &mut [Event]) {
2328 for ev in items.iter_mut() {
2329 strip_relations_from_event(ev);
2330 }
2331 }
2332
2333 async fn find_event(
2336 event_id: &EventId,
2337 room_id: &RoomId,
2338 room_linked_chunk: &EventLinkedChunk,
2339 store: &EventCacheStoreLockGuard,
2340 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
2341 for (position, event) in room_linked_chunk.revents() {
2344 if event.event_id().as_deref() == Some(event_id) {
2345 return Ok(Some((EventLocation::Memory(position), event.clone())));
2346 }
2347 }
2348
2349 Ok(store.find_event(room_id, event_id).await?.map(|event| (EventLocation::Store, event)))
2350 }
2351
2352 async fn find_event_with_relations(
2356 event_id: &EventId,
2357 room_id: &RoomId,
2358 filters: Option<Vec<RelationType>>,
2359 room_linked_chunk: &EventLinkedChunk,
2360 store: &EventCacheStoreLockGuard,
2361 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
2362 let found = store.find_event(room_id, event_id).await?;
2364
2365 let Some(target) = found else {
2366 return Ok(None);
2368 };
2369
2370 let related =
2372 find_event_relations(event_id, room_id, filters, room_linked_chunk, store).await?;
2373
2374 Ok(Some((target, related)))
2375 }
2376
2377 async fn find_event_relations(
2380 event_id: &EventId,
2381 room_id: &RoomId,
2382 filters: Option<Vec<RelationType>>,
2383 room_linked_chunk: &EventLinkedChunk,
2384 store: &EventCacheStoreLockGuard,
2385 ) -> Result<Vec<Event>, EventCacheError> {
2386 let mut related = store.find_event_relations(room_id, event_id, filters.as_deref()).await?;
2389 let mut stack =
2390 related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
2391
2392 let mut already_seen = HashSet::new();
2395 already_seen.insert(event_id.to_owned());
2396
2397 let mut num_iters = 1;
2398
2399 while let Some(event_id) = stack.pop() {
2401 if !already_seen.insert(event_id.clone()) {
2402 continue;
2404 }
2405
2406 let other_related =
2407 store.find_event_relations(room_id, &event_id, filters.as_deref()).await?;
2408
2409 stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
2410 related.extend(other_related);
2411
2412 num_iters += 1;
2413 }
2414
2415 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
2416
2417 related.sort_by(|(_, lhs), (_, rhs)| {
2421 use std::cmp::Ordering;
2422
2423 match (lhs, rhs) {
2424 (None, None) => Ordering::Equal,
2425 (None, Some(_)) => Ordering::Less,
2426 (Some(_), None) => Ordering::Greater,
2427 (Some(lhs), Some(rhs)) => {
2428 let lhs = room_linked_chunk.event_order(*lhs);
2429 let rhs = room_linked_chunk.event_order(*rhs);
2430
2431 match (lhs, rhs) {
2435 (None, None) => Ordering::Equal,
2436 (None, Some(_)) => Ordering::Less,
2437 (Some(_), None) => Ordering::Greater,
2438 (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
2439 }
2440 }
2441 }
2442 });
2443
2444 let related = related.into_iter().map(|(event, _pos)| event).collect();
2446
2447 Ok(related)
2448 }
2449}
2450
2451pub(super) enum EventLocation {
2453 Memory(Position),
2455
2456 Store,
2458}
2459
2460pub(super) use private::RoomEventCacheStateLock;
2461
2462#[cfg(test)]
2463mod tests {
2464 use matrix_sdk_base::event_cache::Event;
2465 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2466 use ruma::{
2467 RoomId, event_id,
2468 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2469 room_id, user_id,
2470 };
2471
2472 use crate::test_utils::logged_in_client;
2473
2474 #[async_test]
2475 async fn test_find_event_by_id_with_edit_relation() {
2476 let original_id = event_id!("$original");
2477 let related_id = event_id!("$related");
2478 let room_id = room_id!("!galette:saucisse.bzh");
2479 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2480
2481 assert_relations(
2482 room_id,
2483 f.text_msg("Original event").event_id(original_id).into(),
2484 f.text_msg("* An edited event")
2485 .edit(
2486 original_id,
2487 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2488 )
2489 .event_id(related_id)
2490 .into(),
2491 f,
2492 )
2493 .await;
2494 }
2495
2496 #[async_test]
2497 async fn test_find_event_by_id_with_thread_reply_relation() {
2498 let original_id = event_id!("$original");
2499 let related_id = event_id!("$related");
2500 let room_id = room_id!("!galette:saucisse.bzh");
2501 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2502
2503 assert_relations(
2504 room_id,
2505 f.text_msg("Original event").event_id(original_id).into(),
2506 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2507 f,
2508 )
2509 .await;
2510 }
2511
2512 #[async_test]
2513 async fn test_find_event_by_id_with_reaction_relation() {
2514 let original_id = event_id!("$original");
2515 let related_id = event_id!("$related");
2516 let room_id = room_id!("!galette:saucisse.bzh");
2517 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2518
2519 assert_relations(
2520 room_id,
2521 f.text_msg("Original event").event_id(original_id).into(),
2522 f.reaction(original_id, ":D").event_id(related_id).into(),
2523 f,
2524 )
2525 .await;
2526 }
2527
2528 #[async_test]
2529 async fn test_find_event_by_id_with_poll_response_relation() {
2530 let original_id = event_id!("$original");
2531 let related_id = event_id!("$related");
2532 let room_id = room_id!("!galette:saucisse.bzh");
2533 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2534
2535 assert_relations(
2536 room_id,
2537 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2538 .event_id(original_id)
2539 .into(),
2540 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2541 f,
2542 )
2543 .await;
2544 }
2545
2546 #[async_test]
2547 async fn test_find_event_by_id_with_poll_end_relation() {
2548 let original_id = event_id!("$original");
2549 let related_id = event_id!("$related");
2550 let room_id = room_id!("!galette:saucisse.bzh");
2551 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2552
2553 assert_relations(
2554 room_id,
2555 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2556 .event_id(original_id)
2557 .into(),
2558 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2559 f,
2560 )
2561 .await;
2562 }
2563
2564 #[async_test]
2565 async fn test_find_event_by_id_with_filtered_relationships() {
2566 let original_id = event_id!("$original");
2567 let related_id = event_id!("$related");
2568 let associated_related_id = event_id!("$recursive_related");
2569 let room_id = room_id!("!galette:saucisse.bzh");
2570 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2571
2572 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2573 let related_event = event_factory
2574 .text_msg("* Edited event")
2575 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2576 .event_id(related_id)
2577 .into();
2578 let associated_related_event =
2579 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2580
2581 let client = logged_in_client(None).await;
2582
2583 let event_cache = client.event_cache();
2584 event_cache.subscribe().unwrap();
2585
2586 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2587 let room = client.get_room(room_id).unwrap();
2588
2589 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2590
2591 room_event_cache.save_events([original_event]).await;
2593
2594 room_event_cache.save_events([related_event]).await;
2596
2597 room_event_cache.save_events([associated_related_event]).await;
2599
2600 let filter = Some(vec![RelationType::Replacement]);
2601 let (event, related_events) = room_event_cache
2602 .find_event_with_relations(original_id, filter)
2603 .await
2604 .expect("Failed to find the event with relations")
2605 .expect("Event has no relation");
2606 let cached_event_id = event.event_id().unwrap();
2608 assert_eq!(cached_event_id, original_id);
2609
2610 assert_eq!(related_events.len(), 1);
2612
2613 let related_event_id = related_events[0].event_id().unwrap();
2614 assert_eq!(related_event_id, related_id);
2615
2616 let filter = Some(vec![RelationType::Thread]);
2618 let (event, related_events) = room_event_cache
2619 .find_event_with_relations(original_id, filter)
2620 .await
2621 .expect("Failed to find the event with relations")
2622 .expect("Event has no relation");
2623
2624 let cached_event_id = event.event_id().unwrap();
2626 assert_eq!(cached_event_id, original_id);
2627 assert!(related_events.is_empty());
2629 }
2630
2631 #[async_test]
2632 async fn test_find_event_by_id_with_recursive_relation() {
2633 let original_id = event_id!("$original");
2634 let related_id = event_id!("$related");
2635 let associated_related_id = event_id!("$recursive_related");
2636 let room_id = room_id!("!galette:saucisse.bzh");
2637 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2638
2639 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2640 let related_event = event_factory
2641 .text_msg("* Edited event")
2642 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2643 .event_id(related_id)
2644 .into();
2645 let associated_related_event =
2646 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2647
2648 let client = logged_in_client(None).await;
2649
2650 let event_cache = client.event_cache();
2651 event_cache.subscribe().unwrap();
2652
2653 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2654 let room = client.get_room(room_id).unwrap();
2655
2656 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2657
2658 room_event_cache.save_events([original_event]).await;
2660
2661 room_event_cache.save_events([related_event]).await;
2663
2664 room_event_cache.save_events([associated_related_event]).await;
2666
2667 let (event, related_events) = room_event_cache
2668 .find_event_with_relations(original_id, None)
2669 .await
2670 .expect("Failed to find the event with relations")
2671 .expect("Event has no relation");
2672 let cached_event_id = event.event_id().unwrap();
2674 assert_eq!(cached_event_id, original_id);
2675
2676 assert_eq!(related_events.len(), 2);
2678
2679 let related_event_id = related_events[0].event_id().unwrap();
2680 assert_eq!(related_event_id, related_id);
2681 let related_event_id = related_events[1].event_id().unwrap();
2682 assert_eq!(related_event_id, associated_related_id);
2683 }
2684
2685 async fn assert_relations(
2686 room_id: &RoomId,
2687 original_event: Event,
2688 related_event: Event,
2689 event_factory: EventFactory,
2690 ) {
2691 let client = logged_in_client(None).await;
2692
2693 let event_cache = client.event_cache();
2694 event_cache.subscribe().unwrap();
2695
2696 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2697 let room = client.get_room(room_id).unwrap();
2698
2699 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2700
2701 let original_event_id = original_event.event_id().unwrap();
2703 room_event_cache.save_events([original_event]).await;
2704
2705 let unrelated_id = event_id!("$2");
2707 room_event_cache
2708 .save_events([event_factory
2709 .text_msg("An unrelated event")
2710 .event_id(unrelated_id)
2711 .into()])
2712 .await;
2713
2714 let related_id = related_event.event_id().unwrap();
2716 room_event_cache.save_events([related_event]).await;
2717
2718 let (event, related_events) = room_event_cache
2719 .find_event_with_relations(&original_event_id, None)
2720 .await
2721 .expect("Failed to find the event with relations")
2722 .expect("Event has no relation");
2723 let cached_event_id = event.event_id().unwrap();
2725 assert_eq!(cached_event_id, original_event_id);
2726
2727 let related_event_id = related_events[0].event_id().unwrap();
2729 assert_eq!(related_event_id, related_id);
2730 }
2731}
2732
2733#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
2735 use std::{ops::Not, sync::Arc};
2736
2737 use assert_matches::assert_matches;
2738 use assert_matches2::assert_let;
2739 use eyeball_im::VectorDiff;
2740 use futures_util::FutureExt;
2741 use matrix_sdk_base::{
2742 event_cache::{
2743 Gap,
2744 store::{EventCacheStore as _, MemoryStore},
2745 },
2746 linked_chunk::{
2747 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2748 lazy_loader::from_all_chunks,
2749 },
2750 store::StoreConfig,
2751 sync::{JoinedRoomUpdate, Timeline},
2752 };
2753 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2754 use ruma::{
2755 EventId, OwnedUserId, event_id,
2756 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2757 room_id, user_id,
2758 };
2759 use tokio::task::yield_now;
2760
2761 use super::RoomEventCacheGenericUpdate;
2762 use crate::{
2763 assert_let_timeout,
2764 event_cache::{RoomEventCache, RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2765 test_utils::client::MockClientBuilder,
2766 };
2767
2768 #[async_test]
2769 async fn test_write_to_storage() {
2770 let room_id = room_id!("!galette:saucisse.bzh");
2771 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2772
2773 let event_cache_store = Arc::new(MemoryStore::new());
2774
2775 let client = MockClientBuilder::new(None)
2776 .on_builder(|builder| {
2777 builder.store_config(
2778 StoreConfig::new("hodlor".to_owned())
2779 .event_cache_store(event_cache_store.clone()),
2780 )
2781 })
2782 .build()
2783 .await;
2784
2785 let event_cache = client.event_cache();
2786
2787 event_cache.subscribe().unwrap();
2789
2790 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2791 let room = client.get_room(room_id).unwrap();
2792
2793 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2794 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2795
2796 let timeline = Timeline {
2798 limited: true,
2799 prev_batch: Some("raclette".to_owned()),
2800 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2801 };
2802
2803 room_event_cache
2804 .inner
2805 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2806 .await
2807 .unwrap();
2808
2809 assert_matches!(
2811 generic_stream.recv().await,
2812 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2813 assert_eq!(expected_room_id, room_id);
2814 }
2815 );
2816 assert!(generic_stream.is_empty());
2817
2818 let linked_chunk = from_all_chunks::<3, _, _>(
2820 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2821 )
2822 .unwrap()
2823 .unwrap();
2824
2825 assert_eq!(linked_chunk.chunks().count(), 2);
2826
2827 let mut chunks = linked_chunk.chunks();
2828
2829 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2831 assert_eq!(gap.prev_token, "raclette");
2832 });
2833
2834 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2836 assert_eq!(events.len(), 1);
2837 let deserialized = events[0].raw().deserialize().unwrap();
2838 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2839 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2840 });
2841
2842 assert!(chunks.next().is_none());
2844 }
2845
2846 #[async_test]
2847 async fn test_write_to_storage_strips_bundled_relations() {
2848 let room_id = room_id!("!galette:saucisse.bzh");
2849 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2850
2851 let event_cache_store = Arc::new(MemoryStore::new());
2852
2853 let client = MockClientBuilder::new(None)
2854 .on_builder(|builder| {
2855 builder.store_config(
2856 StoreConfig::new("hodlor".to_owned())
2857 .event_cache_store(event_cache_store.clone()),
2858 )
2859 })
2860 .build()
2861 .await;
2862
2863 let event_cache = client.event_cache();
2864
2865 event_cache.subscribe().unwrap();
2867
2868 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2869 let room = client.get_room(room_id).unwrap();
2870
2871 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2872 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2873
2874 let ev = f
2876 .text_msg("hey yo")
2877 .sender(*ALICE)
2878 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2879 .into_event();
2880
2881 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2882
2883 room_event_cache
2884 .inner
2885 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2886 .await
2887 .unwrap();
2888
2889 assert_matches!(
2891 generic_stream.recv().await,
2892 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2893 assert_eq!(expected_room_id, room_id);
2894 }
2895 );
2896 assert!(generic_stream.is_empty());
2897
2898 {
2900 let events = room_event_cache.events().await.unwrap();
2901
2902 assert_eq!(events.len(), 1);
2903
2904 let ev = events[0].raw().deserialize().unwrap();
2905 assert_let!(
2906 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2907 );
2908
2909 let original = msg.as_original().unwrap();
2910 assert_eq!(original.content.body(), "hey yo");
2911 assert!(original.unsigned.relations.replace.is_some());
2912 }
2913
2914 let linked_chunk = from_all_chunks::<3, _, _>(
2916 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2917 )
2918 .unwrap()
2919 .unwrap();
2920
2921 assert_eq!(linked_chunk.chunks().count(), 1);
2922
2923 let mut chunks = linked_chunk.chunks();
2924 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2925 assert_eq!(events.len(), 1);
2926
2927 let ev = events[0].raw().deserialize().unwrap();
2928 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2929
2930 let original = msg.as_original().unwrap();
2931 assert_eq!(original.content.body(), "hey yo");
2932 assert!(original.unsigned.relations.replace.is_none());
2933 });
2934
2935 assert!(chunks.next().is_none());
2937 }
2938
2939 #[async_test]
2940 async fn test_clear() {
2941 let room_id = room_id!("!galette:saucisse.bzh");
2942 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2943
2944 let event_cache_store = Arc::new(MemoryStore::new());
2945
2946 let event_id1 = event_id!("$1");
2947 let event_id2 = event_id!("$2");
2948
2949 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2950 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2951
2952 event_cache_store
2954 .handle_linked_chunk_updates(
2955 LinkedChunkId::Room(room_id),
2956 vec![
2957 Update::NewItemsChunk {
2959 previous: None,
2960 new: ChunkIdentifier::new(0),
2961 next: None,
2962 },
2963 Update::NewGapChunk {
2965 previous: Some(ChunkIdentifier::new(0)),
2966 new: ChunkIdentifier::new(42),
2968 next: None,
2969 gap: Gap { prev_token: "comté".to_owned() },
2970 },
2971 Update::NewItemsChunk {
2973 previous: Some(ChunkIdentifier::new(42)),
2974 new: ChunkIdentifier::new(1),
2975 next: None,
2976 },
2977 Update::PushItems {
2978 at: Position::new(ChunkIdentifier::new(1), 0),
2979 items: vec![ev1.clone()],
2980 },
2981 Update::NewItemsChunk {
2983 previous: Some(ChunkIdentifier::new(1)),
2984 new: ChunkIdentifier::new(2),
2985 next: None,
2986 },
2987 Update::PushItems {
2988 at: Position::new(ChunkIdentifier::new(2), 0),
2989 items: vec![ev2.clone()],
2990 },
2991 ],
2992 )
2993 .await
2994 .unwrap();
2995
2996 let client = MockClientBuilder::new(None)
2997 .on_builder(|builder| {
2998 builder.store_config(
2999 StoreConfig::new("hodlor".to_owned())
3000 .event_cache_store(event_cache_store.clone()),
3001 )
3002 })
3003 .build()
3004 .await;
3005
3006 let event_cache = client.event_cache();
3007
3008 event_cache.subscribe().unwrap();
3010
3011 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3012 let room = client.get_room(room_id).unwrap();
3013
3014 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3015
3016 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3017 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3018
3019 {
3021 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3022 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3023 }
3024
3025 {
3027 assert_eq!(items.len(), 1);
3029 assert_eq!(items[0].event_id().unwrap(), event_id2);
3030
3031 assert!(stream.is_empty());
3032 }
3033
3034 {
3036 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3037
3038 assert_let_timeout!(
3039 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3040 );
3041 assert_eq!(diffs.len(), 1);
3042 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3043 assert_eq!(event.event_id().unwrap(), event_id1);
3045 });
3046
3047 assert!(stream.is_empty());
3048
3049 assert_let_timeout!(
3050 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
3051 generic_stream.recv()
3052 );
3053 assert_eq!(room_id, expected_room_id);
3054 assert!(generic_stream.is_empty());
3055 }
3056
3057 room_event_cache.clear().await.unwrap();
3059
3060 assert_let_timeout!(
3062 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3063 );
3064 assert_eq!(diffs.len(), 1);
3065 assert_let!(VectorDiff::Clear = &diffs[0]);
3066
3067 assert_let_timeout!(
3069 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3070 );
3071 assert_eq!(received_room_id, room_id);
3072 assert!(generic_stream.is_empty());
3073
3074 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3077
3078 let items = room_event_cache.events().await.unwrap();
3080 assert!(items.is_empty());
3081
3082 let linked_chunk = from_all_chunks::<3, _, _>(
3084 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
3085 )
3086 .unwrap()
3087 .unwrap();
3088
3089 assert_eq!(linked_chunk.num_items(), 0);
3093 }
3094
3095 #[async_test]
3096 async fn test_load_from_storage() {
3097 let room_id = room_id!("!galette:saucisse.bzh");
3098 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
3099
3100 let event_cache_store = Arc::new(MemoryStore::new());
3101
3102 let event_id1 = event_id!("$1");
3103 let event_id2 = event_id!("$2");
3104
3105 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3106 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3107
3108 event_cache_store
3110 .handle_linked_chunk_updates(
3111 LinkedChunkId::Room(room_id),
3112 vec![
3113 Update::NewItemsChunk {
3115 previous: None,
3116 new: ChunkIdentifier::new(0),
3117 next: None,
3118 },
3119 Update::NewGapChunk {
3121 previous: Some(ChunkIdentifier::new(0)),
3122 new: ChunkIdentifier::new(42),
3124 next: None,
3125 gap: Gap { prev_token: "cheddar".to_owned() },
3126 },
3127 Update::NewItemsChunk {
3129 previous: Some(ChunkIdentifier::new(42)),
3130 new: ChunkIdentifier::new(1),
3131 next: None,
3132 },
3133 Update::PushItems {
3134 at: Position::new(ChunkIdentifier::new(1), 0),
3135 items: vec![ev1.clone()],
3136 },
3137 Update::NewItemsChunk {
3139 previous: Some(ChunkIdentifier::new(1)),
3140 new: ChunkIdentifier::new(2),
3141 next: None,
3142 },
3143 Update::PushItems {
3144 at: Position::new(ChunkIdentifier::new(2), 0),
3145 items: vec![ev2.clone()],
3146 },
3147 ],
3148 )
3149 .await
3150 .unwrap();
3151
3152 let client = MockClientBuilder::new(None)
3153 .on_builder(|builder| {
3154 builder.store_config(
3155 StoreConfig::new("hodlor".to_owned())
3156 .event_cache_store(event_cache_store.clone()),
3157 )
3158 })
3159 .build()
3160 .await;
3161
3162 let event_cache = client.event_cache();
3163
3164 event_cache.subscribe().unwrap();
3166
3167 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3169
3170 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3171 let room = client.get_room(room_id).unwrap();
3172
3173 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3174
3175 assert_matches!(
3178 generic_stream.recv().await,
3179 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3180 assert_eq!(room_id, expected_room_id);
3181 }
3182 );
3183 assert!(generic_stream.is_empty());
3184
3185 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3186
3187 assert_eq!(items.len(), 1);
3190 assert_eq!(items[0].event_id().unwrap(), event_id2);
3191 assert!(stream.is_empty());
3192
3193 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3195 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3196
3197 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3199
3200 assert_let_timeout!(
3201 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3202 );
3203 assert_eq!(diffs.len(), 1);
3204 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3205 assert_eq!(event.event_id().unwrap(), event_id1);
3206 });
3207
3208 assert!(stream.is_empty());
3209
3210 assert_matches!(
3212 generic_stream.recv().await,
3213 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3214 assert_eq!(expected_room_id, room_id);
3215 }
3216 );
3217 assert!(generic_stream.is_empty());
3218
3219 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
3221
3222 room_event_cache
3223 .inner
3224 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
3225 .await
3226 .unwrap();
3227
3228 assert!(generic_stream.recv().now_or_never().is_none());
3231
3232 let items = room_event_cache.events().await.unwrap();
3237 assert_eq!(items.len(), 2);
3238 assert_eq!(items[0].event_id().unwrap(), event_id1);
3239 assert_eq!(items[1].event_id().unwrap(), event_id2);
3240 }
3241
3242 #[async_test]
3243 async fn test_load_from_storage_resilient_to_failure() {
3244 let room_id = room_id!("!fondue:patate.ch");
3245 let event_cache_store = Arc::new(MemoryStore::new());
3246
3247 let event = EventFactory::new()
3248 .room(room_id)
3249 .sender(user_id!("@ben:saucisse.bzh"))
3250 .text_msg("foo")
3251 .event_id(event_id!("$42"))
3252 .into_event();
3253
3254 event_cache_store
3256 .handle_linked_chunk_updates(
3257 LinkedChunkId::Room(room_id),
3258 vec![
3259 Update::NewItemsChunk {
3260 previous: None,
3261 new: ChunkIdentifier::new(0),
3262 next: None,
3263 },
3264 Update::PushItems {
3265 at: Position::new(ChunkIdentifier::new(0), 0),
3266 items: vec![event],
3267 },
3268 Update::NewItemsChunk {
3269 previous: Some(ChunkIdentifier::new(0)),
3270 new: ChunkIdentifier::new(1),
3271 next: Some(ChunkIdentifier::new(0)),
3272 },
3273 ],
3274 )
3275 .await
3276 .unwrap();
3277
3278 let client = MockClientBuilder::new(None)
3279 .on_builder(|builder| {
3280 builder.store_config(
3281 StoreConfig::new("holder".to_owned())
3282 .event_cache_store(event_cache_store.clone()),
3283 )
3284 })
3285 .build()
3286 .await;
3287
3288 let event_cache = client.event_cache();
3289
3290 event_cache.subscribe().unwrap();
3292
3293 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3294 let room = client.get_room(room_id).unwrap();
3295
3296 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3297
3298 let items = room_event_cache.events().await.unwrap();
3299
3300 assert!(items.is_empty());
3303
3304 let raw_chunks =
3307 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
3308 assert!(raw_chunks.is_empty());
3309 }
3310
3311 #[async_test]
3312 async fn test_no_useless_gaps() {
3313 let room_id = room_id!("!galette:saucisse.bzh");
3314
3315 let client = MockClientBuilder::new(None).build().await;
3316
3317 let event_cache = client.event_cache();
3318 event_cache.subscribe().unwrap();
3319
3320 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3321 let room = client.get_room(room_id).unwrap();
3322 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3323 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3324
3325 let f = EventFactory::new().room(room_id).sender(*ALICE);
3326
3327 room_event_cache
3330 .inner
3331 .handle_joined_room_update(JoinedRoomUpdate {
3332 timeline: Timeline {
3333 limited: true,
3334 prev_batch: Some("raclette".to_owned()),
3335 events: vec![f.text_msg("hey yo").into_event()],
3336 },
3337 ..Default::default()
3338 })
3339 .await
3340 .unwrap();
3341
3342 assert_matches!(
3344 generic_stream.recv().await,
3345 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3346 assert_eq!(expected_room_id, room_id);
3347 }
3348 );
3349 assert!(generic_stream.is_empty());
3350
3351 {
3352 let mut state = room_event_cache.inner.state.write().await.unwrap();
3353
3354 let mut num_gaps = 0;
3355 let mut num_events = 0;
3356
3357 for c in state.room_linked_chunk().chunks() {
3358 match c.content() {
3359 ChunkContent::Items(items) => num_events += items.len(),
3360 ChunkContent::Gap(_) => num_gaps += 1,
3361 }
3362 }
3363
3364 assert_eq!(num_gaps, 0);
3367 assert_eq!(num_events, 1);
3368
3369 assert_matches!(
3371 state.load_more_events_backwards().await.unwrap(),
3372 LoadMoreEventsBackwardsOutcome::Gap { .. }
3373 );
3374
3375 num_gaps = 0;
3376 num_events = 0;
3377 for c in state.room_linked_chunk().chunks() {
3378 match c.content() {
3379 ChunkContent::Items(items) => num_events += items.len(),
3380 ChunkContent::Gap(_) => num_gaps += 1,
3381 }
3382 }
3383
3384 assert_eq!(num_gaps, 1);
3386 assert_eq!(num_events, 1);
3387 }
3388
3389 room_event_cache
3392 .inner
3393 .handle_joined_room_update(JoinedRoomUpdate {
3394 timeline: Timeline {
3395 limited: false,
3396 prev_batch: Some("fondue".to_owned()),
3397 events: vec![f.text_msg("sup").into_event()],
3398 },
3399 ..Default::default()
3400 })
3401 .await
3402 .unwrap();
3403
3404 assert_matches!(
3406 generic_stream.recv().await,
3407 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3408 assert_eq!(expected_room_id, room_id);
3409 }
3410 );
3411 assert!(generic_stream.is_empty());
3412
3413 {
3414 let state = room_event_cache.inner.state.read().await.unwrap();
3415
3416 let mut num_gaps = 0;
3417 let mut num_events = 0;
3418
3419 for c in state.room_linked_chunk().chunks() {
3420 match c.content() {
3421 ChunkContent::Items(items) => num_events += items.len(),
3422 ChunkContent::Gap(gap) => {
3423 assert_eq!(gap.prev_token, "raclette");
3424 num_gaps += 1;
3425 }
3426 }
3427 }
3428
3429 assert_eq!(num_gaps, 1);
3431 assert_eq!(num_events, 2);
3432 }
3433 }
3434
3435 #[async_test]
3436 async fn test_shrink_to_last_chunk() {
3437 let room_id = room_id!("!galette:saucisse.bzh");
3438
3439 let client = MockClientBuilder::new(None).build().await;
3440
3441 let f = EventFactory::new().room(room_id);
3442
3443 let evid1 = event_id!("$1");
3444 let evid2 = event_id!("$2");
3445
3446 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3447 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3448
3449 {
3451 client
3452 .event_cache_store()
3453 .lock()
3454 .await
3455 .expect("Could not acquire the event cache lock")
3456 .as_clean()
3457 .expect("Could not acquire a clean event cache lock")
3458 .handle_linked_chunk_updates(
3459 LinkedChunkId::Room(room_id),
3460 vec![
3461 Update::NewItemsChunk {
3462 previous: None,
3463 new: ChunkIdentifier::new(0),
3464 next: None,
3465 },
3466 Update::PushItems {
3467 at: Position::new(ChunkIdentifier::new(0), 0),
3468 items: vec![ev1],
3469 },
3470 Update::NewItemsChunk {
3471 previous: Some(ChunkIdentifier::new(0)),
3472 new: ChunkIdentifier::new(1),
3473 next: None,
3474 },
3475 Update::PushItems {
3476 at: Position::new(ChunkIdentifier::new(1), 0),
3477 items: vec![ev2],
3478 },
3479 ],
3480 )
3481 .await
3482 .unwrap();
3483 }
3484
3485 let event_cache = client.event_cache();
3486 event_cache.subscribe().unwrap();
3487
3488 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3489 let room = client.get_room(room_id).unwrap();
3490 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3491
3492 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
3494 assert_eq!(events.len(), 1);
3495 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3496 assert!(stream.is_empty());
3497
3498 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3499
3500 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3502 assert_eq!(outcome.events.len(), 1);
3503 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3504 assert!(outcome.reached_start);
3505
3506 assert_let_timeout!(
3508 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3509 );
3510 assert_eq!(diffs.len(), 1);
3511 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3512 assert_eq!(value.event_id().as_deref(), Some(evid1));
3513 });
3514
3515 assert!(stream.is_empty());
3516
3517 assert_let_timeout!(
3519 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
3520 );
3521 assert_eq!(expected_room_id, room_id);
3522 assert!(generic_stream.is_empty());
3523
3524 let diffs = room_event_cache
3526 .inner
3527 .state
3528 .write()
3529 .await
3530 .unwrap()
3531 .force_shrink_to_last_chunk()
3532 .await
3533 .expect("shrinking should succeed");
3534
3535 assert_eq!(diffs.len(), 2);
3537 assert_matches!(&diffs[0], VectorDiff::Clear);
3538 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3539 assert_eq!(values.len(), 1);
3540 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3541 });
3542
3543 assert!(stream.is_empty());
3544
3545 assert!(generic_stream.is_empty());
3547
3548 let events = room_event_cache.events().await.unwrap();
3550 assert_eq!(events.len(), 1);
3551 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3552
3553 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3556 assert_eq!(outcome.events.len(), 1);
3557 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3558 assert!(outcome.reached_start);
3559 }
3560
3561 #[async_test]
3562 async fn test_room_ordering() {
3563 let room_id = room_id!("!galette:saucisse.bzh");
3564
3565 let client = MockClientBuilder::new(None).build().await;
3566
3567 let f = EventFactory::new().room(room_id).sender(*ALICE);
3568
3569 let evid1 = event_id!("$1");
3570 let evid2 = event_id!("$2");
3571 let evid3 = event_id!("$3");
3572
3573 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3574 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3575 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3576
3577 {
3579 client
3580 .event_cache_store()
3581 .lock()
3582 .await
3583 .expect("Could not acquire the event cache lock")
3584 .as_clean()
3585 .expect("Could not acquire a clean event cache lock")
3586 .handle_linked_chunk_updates(
3587 LinkedChunkId::Room(room_id),
3588 vec![
3589 Update::NewItemsChunk {
3590 previous: None,
3591 new: ChunkIdentifier::new(0),
3592 next: None,
3593 },
3594 Update::PushItems {
3595 at: Position::new(ChunkIdentifier::new(0), 0),
3596 items: vec![ev1, ev2],
3597 },
3598 Update::NewItemsChunk {
3599 previous: Some(ChunkIdentifier::new(0)),
3600 new: ChunkIdentifier::new(1),
3601 next: None,
3602 },
3603 Update::PushItems {
3604 at: Position::new(ChunkIdentifier::new(1), 0),
3605 items: vec![ev3.clone()],
3606 },
3607 ],
3608 )
3609 .await
3610 .unwrap();
3611 }
3612
3613 let event_cache = client.event_cache();
3614 event_cache.subscribe().unwrap();
3615
3616 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3617 let room = client.get_room(room_id).unwrap();
3618 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3619
3620 {
3623 let state = room_event_cache.inner.state.read().await.unwrap();
3624 let room_linked_chunk = state.room_linked_chunk();
3625
3626 assert_eq!(
3628 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3629 Some(0)
3630 );
3631
3632 assert_eq!(
3634 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3635 Some(1)
3636 );
3637
3638 let mut events = room_linked_chunk.events();
3640 let (pos, ev) = events.next().unwrap();
3641 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3642 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3643 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3644
3645 assert!(events.next().is_none());
3647 }
3648
3649 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3651 assert!(outcome.reached_start);
3652
3653 {
3656 let state = room_event_cache.inner.state.read().await.unwrap();
3657 let room_linked_chunk = state.room_linked_chunk();
3658
3659 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
3660 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
3661 }
3662 }
3663
3664 let evid4 = event_id!("$4");
3669 room_event_cache
3670 .inner
3671 .handle_joined_room_update(JoinedRoomUpdate {
3672 timeline: Timeline {
3673 limited: true,
3674 prev_batch: Some("fondue".to_owned()),
3675 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3676 },
3677 ..Default::default()
3678 })
3679 .await
3680 .unwrap();
3681
3682 {
3683 let state = room_event_cache.inner.state.read().await.unwrap();
3684 let room_linked_chunk = state.room_linked_chunk();
3685
3686 let mut events = room_linked_chunk.events();
3688
3689 let (pos, ev) = events.next().unwrap();
3690 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3691 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3692
3693 let (pos, ev) = events.next().unwrap();
3694 assert_eq!(ev.event_id().as_deref(), Some(evid4));
3695 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
3696
3697 assert!(events.next().is_none());
3699
3700 assert_eq!(
3702 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3703 Some(0)
3704 );
3705 assert_eq!(
3706 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3707 Some(1)
3708 );
3709
3710 assert_eq!(
3713 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
3714 None
3715 );
3716 }
3717 }
3718
3719 #[async_test]
3720 async fn test_auto_shrink_after_all_subscribers_are_gone() {
3721 let room_id = room_id!("!galette:saucisse.bzh");
3722
3723 let client = MockClientBuilder::new(None).build().await;
3724
3725 let f = EventFactory::new().room(room_id);
3726
3727 let evid1 = event_id!("$1");
3728 let evid2 = event_id!("$2");
3729
3730 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3731 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3732
3733 {
3735 client
3736 .event_cache_store()
3737 .lock()
3738 .await
3739 .expect("Could not acquire the event cache lock")
3740 .as_clean()
3741 .expect("Could not acquire a clean event cache lock")
3742 .handle_linked_chunk_updates(
3743 LinkedChunkId::Room(room_id),
3744 vec![
3745 Update::NewItemsChunk {
3746 previous: None,
3747 new: ChunkIdentifier::new(0),
3748 next: None,
3749 },
3750 Update::PushItems {
3751 at: Position::new(ChunkIdentifier::new(0), 0),
3752 items: vec![ev1],
3753 },
3754 Update::NewItemsChunk {
3755 previous: Some(ChunkIdentifier::new(0)),
3756 new: ChunkIdentifier::new(1),
3757 next: None,
3758 },
3759 Update::PushItems {
3760 at: Position::new(ChunkIdentifier::new(1), 0),
3761 items: vec![ev2],
3762 },
3763 ],
3764 )
3765 .await
3766 .unwrap();
3767 }
3768
3769 let event_cache = client.event_cache();
3770 event_cache.subscribe().unwrap();
3771
3772 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3773 let room = client.get_room(room_id).unwrap();
3774 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3775
3776 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
3778 assert_eq!(events1.len(), 1);
3779 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3780 assert!(stream1.is_empty());
3781
3782 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3783
3784 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3786 assert_eq!(outcome.events.len(), 1);
3787 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3788 assert!(outcome.reached_start);
3789
3790 assert_let_timeout!(
3793 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3794 );
3795 assert_eq!(diffs.len(), 1);
3796 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3797 assert_eq!(value.event_id().as_deref(), Some(evid1));
3798 });
3799
3800 assert!(stream1.is_empty());
3801
3802 assert_let_timeout!(
3803 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
3804 );
3805 assert_eq!(expected_room_id, room_id);
3806 assert!(generic_stream.is_empty());
3807
3808 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
3812 assert_eq!(events2.len(), 2);
3813 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3814 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3815 assert!(stream2.is_empty());
3816
3817 drop(stream1);
3819 yield_now().await;
3820
3821 assert!(stream2.is_empty());
3823
3824 drop(stream2);
3826 yield_now().await;
3827
3828 {
3831 let state = room_event_cache.inner.state.read().await.unwrap();
3833 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
3834 }
3835
3836 let events3 = room_event_cache.events().await.unwrap();
3838 assert_eq!(events3.len(), 1);
3839 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3840 }
3841
3842 #[async_test]
3843 async fn test_rfind_map_event_in_memory_by() {
3844 let user_id = user_id!("@mnt_io:matrix.org");
3845 let room_id = room_id!("!raclette:patate.ch");
3846 let client = MockClientBuilder::new(None).build().await;
3847
3848 let event_factory = EventFactory::new().room(room_id);
3849
3850 let event_id_0 = event_id!("$ev0");
3851 let event_id_1 = event_id!("$ev1");
3852 let event_id_2 = event_id!("$ev2");
3853 let event_id_3 = event_id!("$ev3");
3854
3855 let event_0 =
3856 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3857 let event_1 =
3858 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3859 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3860 let event_3 =
3861 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3862
3863 {
3866 client
3867 .event_cache_store()
3868 .lock()
3869 .await
3870 .expect("Could not acquire the event cache lock")
3871 .as_clean()
3872 .expect("Could not acquire a clean event cache lock")
3873 .handle_linked_chunk_updates(
3874 LinkedChunkId::Room(room_id),
3875 vec![
3876 Update::NewItemsChunk {
3877 previous: None,
3878 new: ChunkIdentifier::new(0),
3879 next: None,
3880 },
3881 Update::PushItems {
3882 at: Position::new(ChunkIdentifier::new(0), 0),
3883 items: vec![event_3],
3884 },
3885 Update::NewItemsChunk {
3886 previous: Some(ChunkIdentifier::new(0)),
3887 new: ChunkIdentifier::new(1),
3888 next: None,
3889 },
3890 Update::PushItems {
3891 at: Position::new(ChunkIdentifier::new(1), 0),
3892 items: vec![event_0, event_1, event_2],
3893 },
3894 ],
3895 )
3896 .await
3897 .unwrap();
3898 }
3899
3900 let event_cache = client.event_cache();
3901 event_cache.subscribe().unwrap();
3902
3903 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3904 let room = client.get_room(room_id).unwrap();
3905 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3906
3907 assert_matches!(
3909 room_event_cache
3910 .rfind_map_event_in_memory_by(|event, previous_event| {
3911 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| (event.event_id(), previous_event.and_then(|event| event.event_id())))
3912 })
3913 .await,
3914 Ok(Some((event_id, previous_event_id))) => {
3915 assert_eq!(event_id.as_deref(), Some(event_id_0));
3916 assert!(previous_event_id.is_none());
3917 }
3918 );
3919
3920 assert_matches!(
3923 room_event_cache
3924 .rfind_map_event_in_memory_by(|event, previous_event| {
3925 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| (event.event_id(), previous_event.and_then(|event| event.event_id())))
3926 })
3927 .await,
3928 Ok(Some((event_id, previous_event_id))) => {
3929 assert_eq!(event_id.as_deref(), Some(event_id_2));
3930 assert_eq!(previous_event_id.as_deref(), Some(event_id_1));
3931 }
3932 );
3933
3934 assert!(
3936 room_event_cache
3937 .rfind_map_event_in_memory_by(|event, _| {
3938 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3939 == Some(user_id))
3940 .then(|| event.event_id())
3941 })
3942 .await
3943 .unwrap()
3944 .is_none()
3945 );
3946
3947 assert!(
3949 room_event_cache
3950 .rfind_map_event_in_memory_by(|_, _| None::<()>)
3951 .await
3952 .unwrap()
3953 .is_none()
3954 );
3955 }
3956
3957 #[async_test]
3958 async fn test_reload_when_dirty() {
3959 let user_id = user_id!("@mnt_io:matrix.org");
3960 let room_id = room_id!("!raclette:patate.ch");
3961
3962 let event_cache_store = MemoryStore::new();
3964
3965 let client_p0 = MockClientBuilder::new(None)
3967 .on_builder(|builder| {
3968 builder.store_config(
3969 StoreConfig::new("process #0".to_owned())
3970 .event_cache_store(event_cache_store.clone()),
3971 )
3972 })
3973 .build()
3974 .await;
3975
3976 let client_p1 = MockClientBuilder::new(None)
3978 .on_builder(|builder| {
3979 builder.store_config(
3980 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
3981 )
3982 })
3983 .build()
3984 .await;
3985
3986 let event_factory = EventFactory::new().room(room_id).sender(user_id);
3987
3988 let ev_id_0 = event_id!("$ev_0");
3989 let ev_id_1 = event_id!("$ev_1");
3990
3991 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
3992 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
3993
3994 client_p0
3996 .event_cache_store()
3997 .lock()
3998 .await
3999 .expect("[p0] Could not acquire the event cache lock")
4000 .as_clean()
4001 .expect("[p0] Could not acquire a clean event cache lock")
4002 .handle_linked_chunk_updates(
4003 LinkedChunkId::Room(room_id),
4004 vec![
4005 Update::NewItemsChunk {
4006 previous: None,
4007 new: ChunkIdentifier::new(0),
4008 next: None,
4009 },
4010 Update::PushItems {
4011 at: Position::new(ChunkIdentifier::new(0), 0),
4012 items: vec![ev_0],
4013 },
4014 Update::NewItemsChunk {
4015 previous: Some(ChunkIdentifier::new(0)),
4016 new: ChunkIdentifier::new(1),
4017 next: None,
4018 },
4019 Update::PushItems {
4020 at: Position::new(ChunkIdentifier::new(1), 0),
4021 items: vec![ev_1],
4022 },
4023 ],
4024 )
4025 .await
4026 .unwrap();
4027
4028 let (room_event_cache_p0, room_event_cache_p1) = {
4030 let event_cache_p0 = client_p0.event_cache();
4031 event_cache_p0.subscribe().unwrap();
4032
4033 let event_cache_p1 = client_p1.event_cache();
4034 event_cache_p1.subscribe().unwrap();
4035
4036 client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
4037 client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
4038
4039 let (room_event_cache_p0, _drop_handles) =
4040 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
4041 let (room_event_cache_p1, _drop_handles) =
4042 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
4043
4044 (room_event_cache_p0, room_event_cache_p1)
4045 };
4046
4047 let mut updates_stream_p0 = {
4052 let room_event_cache = &room_event_cache_p0;
4053
4054 let (initial_updates, mut updates_stream) =
4055 room_event_cache_p0.subscribe().await.unwrap();
4056
4057 assert_eq!(initial_updates.len(), 1);
4059 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4060 assert!(updates_stream.is_empty());
4061
4062 assert!(event_loaded(room_event_cache, ev_id_1).await);
4064
4065 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4067
4068 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4070
4071 assert_matches!(
4073 updates_stream.recv().await.unwrap(),
4074 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4075 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4076 assert_matches!(
4077 &diffs[0],
4078 VectorDiff::Insert { index: 0, value: event } => {
4079 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4080 }
4081 );
4082 }
4083 );
4084
4085 assert!(event_loaded(room_event_cache, ev_id_0).await);
4087
4088 updates_stream
4089 };
4090
4091 let mut updates_stream_p1 = {
4093 let room_event_cache = &room_event_cache_p1;
4094 let (initial_updates, mut updates_stream) =
4095 room_event_cache_p1.subscribe().await.unwrap();
4096
4097 assert_eq!(initial_updates.len(), 1);
4099 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4100 assert!(updates_stream.is_empty());
4101
4102 assert!(event_loaded(room_event_cache, ev_id_1).await);
4104
4105 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4107
4108 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4110
4111 assert_matches!(
4113 updates_stream.recv().await.unwrap(),
4114 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4115 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4116 assert_matches!(
4117 &diffs[0],
4118 VectorDiff::Insert { index: 0, value: event } => {
4119 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4120 }
4121 );
4122 }
4123 );
4124
4125 assert!(event_loaded(room_event_cache, ev_id_0).await);
4127
4128 updates_stream
4129 };
4130
4131 for _ in 0..3 {
4133 {
4137 let room_event_cache = &room_event_cache_p0;
4138 let updates_stream = &mut updates_stream_p0;
4139
4140 assert!(event_loaded(room_event_cache, ev_id_1).await);
4142
4143 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4146
4147 assert_matches!(
4149 updates_stream.recv().await.unwrap(),
4150 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4151 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4152 assert_matches!(&diffs[0], VectorDiff::Clear);
4153 assert_matches!(
4154 &diffs[1],
4155 VectorDiff::Append { values: events } => {
4156 assert_eq!(events.len(), 1);
4157 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4158 }
4159 );
4160 }
4161 );
4162
4163 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4165
4166 assert!(event_loaded(room_event_cache, ev_id_0).await);
4168
4169 assert_matches!(
4171 updates_stream.recv().await.unwrap(),
4172 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4173 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4174 assert_matches!(
4175 &diffs[0],
4176 VectorDiff::Insert { index: 0, value: event } => {
4177 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4178 }
4179 );
4180 }
4181 );
4182 }
4183
4184 {
4188 let room_event_cache = &room_event_cache_p1;
4189 let updates_stream = &mut updates_stream_p1;
4190
4191 assert!(event_loaded(room_event_cache, ev_id_1).await);
4193
4194 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4197
4198 assert_matches!(
4200 updates_stream.recv().await.unwrap(),
4201 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4202 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4203 assert_matches!(&diffs[0], VectorDiff::Clear);
4204 assert_matches!(
4205 &diffs[1],
4206 VectorDiff::Append { values: events } => {
4207 assert_eq!(events.len(), 1);
4208 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4209 }
4210 );
4211 }
4212 );
4213
4214 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4216
4217 assert!(event_loaded(room_event_cache, ev_id_0).await);
4219
4220 assert_matches!(
4222 updates_stream.recv().await.unwrap(),
4223 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4224 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4225 assert_matches!(
4226 &diffs[0],
4227 VectorDiff::Insert { index: 0, value: event } => {
4228 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4229 }
4230 );
4231 }
4232 );
4233 }
4234 }
4235
4236 for _ in 0..3 {
4239 {
4240 let room_event_cache = &room_event_cache_p0;
4241 let updates_stream = &mut updates_stream_p0;
4242
4243 let guard = room_event_cache.inner.state.read().await.unwrap();
4244
4245 assert!(guard.is_dirty().not());
4251
4252 assert_matches!(
4254 updates_stream.recv().await.unwrap(),
4255 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4256 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4257 assert_matches!(&diffs[0], VectorDiff::Clear);
4258 assert_matches!(
4259 &diffs[1],
4260 VectorDiff::Append { values: events } => {
4261 assert_eq!(events.len(), 1);
4262 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4263 }
4264 );
4265 }
4266 );
4267
4268 assert!(event_loaded(room_event_cache, ev_id_1).await);
4269 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4270
4271 drop(guard);
4277
4278 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4279 assert!(event_loaded(room_event_cache, ev_id_0).await);
4280
4281 assert_matches!(
4283 updates_stream.recv().await.unwrap(),
4284 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4285 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4286 assert_matches!(
4287 &diffs[0],
4288 VectorDiff::Insert { index: 0, value: event } => {
4289 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4290 }
4291 );
4292 }
4293 );
4294 }
4295
4296 {
4297 let room_event_cache = &room_event_cache_p1;
4298 let updates_stream = &mut updates_stream_p1;
4299
4300 let guard = room_event_cache.inner.state.read().await.unwrap();
4301
4302 assert!(guard.is_dirty().not());
4307
4308 assert_matches!(
4310 updates_stream.recv().await.unwrap(),
4311 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4312 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4313 assert_matches!(&diffs[0], VectorDiff::Clear);
4314 assert_matches!(
4315 &diffs[1],
4316 VectorDiff::Append { values: events } => {
4317 assert_eq!(events.len(), 1);
4318 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4319 }
4320 );
4321 }
4322 );
4323
4324 assert!(event_loaded(room_event_cache, ev_id_1).await);
4325 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4326
4327 drop(guard);
4333
4334 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4335 assert!(event_loaded(room_event_cache, ev_id_0).await);
4336
4337 assert_matches!(
4339 updates_stream.recv().await.unwrap(),
4340 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4341 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4342 assert_matches!(
4343 &diffs[0],
4344 VectorDiff::Insert { index: 0, value: event } => {
4345 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4346 }
4347 );
4348 }
4349 );
4350 }
4351 }
4352
4353 for _ in 0..3 {
4355 {
4356 let room_event_cache = &room_event_cache_p0;
4357 let updates_stream = &mut updates_stream_p0;
4358
4359 let guard = room_event_cache.inner.state.write().await.unwrap();
4360
4361 assert!(guard.is_dirty().not());
4363
4364 assert_matches!(
4366 updates_stream.recv().await.unwrap(),
4367 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4368 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4369 assert_matches!(&diffs[0], VectorDiff::Clear);
4370 assert_matches!(
4371 &diffs[1],
4372 VectorDiff::Append { values: events } => {
4373 assert_eq!(events.len(), 1);
4374 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4375 }
4376 );
4377 }
4378 );
4379
4380 drop(guard);
4383
4384 assert!(event_loaded(room_event_cache, ev_id_1).await);
4385 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4386
4387 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4388 assert!(event_loaded(room_event_cache, ev_id_0).await);
4389
4390 assert_matches!(
4392 updates_stream.recv().await.unwrap(),
4393 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4394 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4395 assert_matches!(
4396 &diffs[0],
4397 VectorDiff::Insert { index: 0, value: event } => {
4398 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4399 }
4400 );
4401 }
4402 );
4403 }
4404
4405 {
4406 let room_event_cache = &room_event_cache_p1;
4407 let updates_stream = &mut updates_stream_p1;
4408
4409 let guard = room_event_cache.inner.state.write().await.unwrap();
4410
4411 assert!(guard.is_dirty().not());
4413
4414 assert_matches!(
4416 updates_stream.recv().await.unwrap(),
4417 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4418 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4419 assert_matches!(&diffs[0], VectorDiff::Clear);
4420 assert_matches!(
4421 &diffs[1],
4422 VectorDiff::Append { values: events } => {
4423 assert_eq!(events.len(), 1);
4424 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4425 }
4426 );
4427 }
4428 );
4429
4430 drop(guard);
4433
4434 assert!(event_loaded(room_event_cache, ev_id_1).await);
4435 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4436
4437 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4438 assert!(event_loaded(room_event_cache, ev_id_0).await);
4439
4440 assert_matches!(
4442 updates_stream.recv().await.unwrap(),
4443 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4444 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4445 assert_matches!(
4446 &diffs[0],
4447 VectorDiff::Insert { index: 0, value: event } => {
4448 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4449 }
4450 );
4451 }
4452 );
4453 }
4454 }
4455 }
4456
4457 #[async_test]
4458 async fn test_load_when_dirty() {
4459 let room_id_0 = room_id!("!raclette:patate.ch");
4460 let room_id_1 = room_id!("!morbiflette:patate.ch");
4461
4462 let event_cache_store = MemoryStore::new();
4464
4465 let client_p0 = MockClientBuilder::new(None)
4467 .on_builder(|builder| {
4468 builder.store_config(
4469 StoreConfig::new("process #0".to_owned())
4470 .event_cache_store(event_cache_store.clone()),
4471 )
4472 })
4473 .build()
4474 .await;
4475
4476 let client_p1 = MockClientBuilder::new(None)
4478 .on_builder(|builder| {
4479 builder.store_config(
4480 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4481 )
4482 })
4483 .build()
4484 .await;
4485
4486 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
4488 let event_cache_p0 = client_p0.event_cache();
4489 event_cache_p0.subscribe().unwrap();
4490
4491 let event_cache_p1 = client_p1.event_cache();
4492 event_cache_p1.subscribe().unwrap();
4493
4494 client_p0
4495 .base_client()
4496 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4497 client_p0
4498 .base_client()
4499 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4500
4501 client_p1
4502 .base_client()
4503 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4504 client_p1
4505 .base_client()
4506 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4507
4508 let (room_event_cache_0_p0, _drop_handles) =
4509 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4510 let (room_event_cache_0_p1, _drop_handles) =
4511 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4512
4513 (room_event_cache_0_p0, room_event_cache_0_p1)
4514 };
4515
4516 {
4518 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
4519 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
4520 }
4521
4522 let (room_event_cache_1_p0, _) =
4526 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
4527
4528 {
4530 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
4531 assert!(guard.is_dirty().not());
4532 }
4533
4534 }
4537
4538 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
4539 room_event_cache
4540 .rfind_map_event_in_memory_by(|event, _previous_event_id| {
4541 (event.event_id().as_deref() == Some(event_id)).then_some(())
4542 })
4543 .await
4544 .unwrap()
4545 .is_some()
4546 }
4547}