1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 Arc,
23 atomic::{AtomicUsize, Ordering},
24 },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::AmbiguityChange,
32 event_cache::Event,
33 linked_chunk::Position,
34 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37 EventId, OwnedEventId, OwnedRoomId,
38 api::Direction,
39 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40 serde::Raw,
41};
42use tokio::sync::{
43 Notify, RwLock,
44 broadcast::{Receiver, Sender},
45 mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50 AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51 RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54 client::WeakClient,
55 event_cache::EventCacheError,
56 room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64#[derive(Clone)]
68pub struct RoomEventCache {
69 pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("RoomEventCache").finish_non_exhaustive()
75 }
76}
77
78#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89 recv: Receiver<RoomEventCacheUpdate>,
91
92 room_id: OwnedRoomId,
94
95 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98 subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103 fn drop(&mut self) {
104 let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106 trace!(
107 "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108 );
109
110 if previous_subscriber_count == 1 {
111 let mut room_id = self.room_id.clone();
115
116 let mut num_attempts = 0;
122
123 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124 num_attempts += 1;
125
126 if num_attempts > 1024 {
127 warn!(
130 "couldn't send notification to the auto-shrink channel \
131 after 1024 attempts; giving up"
132 );
133 return;
134 }
135
136 match err {
137 mpsc::error::TrySendError::Full(stolen_room_id) => {
138 room_id = stolen_room_id;
139 }
140 mpsc::error::TrySendError::Closed(_) => return,
141 }
142 }
143
144 trace!("sent notification to the parent channel that we were the last subscriber");
145 }
146 }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150 type Target = Receiver<RoomEventCacheUpdate>;
151
152 fn deref(&self) -> &Self::Target {
153 &self.recv
154 }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158 fn deref_mut(&mut self) -> &mut Self::Target {
159 &mut self.recv
160 }
161}
162
163impl RoomEventCache {
164 pub(super) fn new(
166 client: WeakClient,
167 state: RoomEventCacheState,
168 pagination_status: SharedObservable<RoomPaginationStatus>,
169 room_id: OwnedRoomId,
170 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
172 ) -> Self {
173 Self {
174 inner: Arc::new(RoomEventCacheInner::new(
175 client,
176 state,
177 pagination_status,
178 room_id,
179 auto_shrink_sender,
180 generic_update_sender,
181 )),
182 }
183 }
184
185 pub async fn events(&self) -> Vec<Event> {
190 let state = self.inner.state.read().await;
191
192 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect()
193 }
194
195 pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheSubscriber) {
202 let state = self.inner.state.read().await;
203 let events =
204 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
205
206 let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst);
207 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
208
209 let recv = self.inner.sender.subscribe();
210 let subscriber = RoomEventCacheSubscriber {
211 recv,
212 room_id: self.inner.room_id.clone(),
213 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
214 subscriber_count: state.subscriber_count.clone(),
215 };
216
217 (events, subscriber)
218 }
219
220 pub async fn subscribe_to_thread(
223 &self,
224 thread_root: OwnedEventId,
225 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
226 let mut state = self.inner.state.write().await;
227 state.subscribe_to_thread(thread_root)
228 }
229
230 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
235 pub async fn paginate_thread_backwards(
236 &self,
237 thread_root: OwnedEventId,
238 num_events: u16,
239 ) -> Result<bool> {
240 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
241
242 let mut outcome =
244 self.inner.state.write().await.load_more_thread_events_backwards(thread_root.clone());
245
246 loop {
247 match outcome {
248 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
249 let options = RelationsOptions {
251 from: prev_token.clone(),
252 dir: Direction::Backward,
253 limit: Some(num_events.into()),
254 include_relations: IncludeRelations::AllRelations,
255 recurse: true,
256 };
257
258 let mut result = room
259 .relations(thread_root.clone(), options)
260 .await
261 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
262
263 let reached_start = result.next_batch_token.is_none();
264 trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
265
266 if reached_start {
269 let root_event = room
271 .load_or_fetch_event(&thread_root, None)
272 .await
273 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
274
275 result.chunk.push(root_event);
278 }
279
280 let mut state = self.inner.state.write().await;
281
282 if let Some(outcome) = state.finish_thread_network_pagination(
283 thread_root.clone(),
284 prev_token,
285 result.next_batch_token,
286 result.chunk,
287 ) {
288 return Ok(outcome.reached_start);
289 }
290
291 outcome = state.load_more_thread_events_backwards(thread_root.clone());
293 }
294
295 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
296 return Ok(true);
298 }
299
300 LoadMoreEventsBackwardsOutcome::Events { .. } => {
301 unimplemented!("loading from disk for threads is not implemented yet");
303 }
304
305 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
306 unreachable!("unused for threads")
307 }
308 }
309 }
310 }
311
312 pub fn pagination(&self) -> RoomPagination {
315 RoomPagination { inner: self.inner.clone() }
316 }
317
318 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Option<O>
324 where
325 P: FnMut(&Event) -> Option<O>,
326 {
327 self.inner.state.read().await.rfind_map_event_in_memory_by(predicate)
328 }
329
330 pub async fn find_event(&self, event_id: &EventId) -> Option<Event> {
335 self.inner
336 .state
337 .read()
338 .await
339 .find_event(event_id)
340 .await
341 .ok()
342 .flatten()
343 .map(|(_loc, event)| event)
344 }
345
346 pub async fn find_event_with_relations(
358 &self,
359 event_id: &EventId,
360 filter: Option<Vec<RelationType>>,
361 ) -> Option<(Event, Vec<Event>)> {
362 self.inner
364 .state
365 .read()
366 .await
367 .find_event_with_relations(event_id, filter.clone())
368 .await
369 .ok()
370 .flatten()
371 }
372
373 pub async fn clear(&self) -> Result<()> {
378 let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
380
381 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
383 diffs: updates_as_vector_diffs,
384 origin: EventsOrigin::Cache,
385 });
386
387 let _ = self
389 .inner
390 .generic_update_sender
391 .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
392
393 Ok(())
394 }
395
396 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
399 if let Err(err) = self.inner.state.write().await.save_event(events).await {
400 warn!("couldn't save event in the event cache: {err}");
401 }
402 }
403
404 pub async fn debug_string(&self) -> Vec<String> {
407 self.inner.state.read().await.room_linked_chunk().debug_string()
408 }
409}
410
411pub(super) struct RoomEventCacheInner {
413 pub(super) room_id: OwnedRoomId,
415
416 pub weak_room: WeakRoom,
417
418 pub sender: Sender<RoomEventCacheUpdate>,
420
421 pub state: RwLock<RoomEventCacheState>,
423
424 pub pagination_batch_token_notifier: Notify,
426
427 pub pagination_status: SharedObservable<RoomPaginationStatus>,
428
429 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
434
435 pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
441}
442
443impl RoomEventCacheInner {
444 fn new(
447 client: WeakClient,
448 state: RoomEventCacheState,
449 pagination_status: SharedObservable<RoomPaginationStatus>,
450 room_id: OwnedRoomId,
451 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
452 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
453 ) -> Self {
454 let sender = Sender::new(32);
455 let weak_room = WeakRoom::new(client, room_id);
456 Self {
457 room_id: weak_room.room_id().to_owned(),
458 weak_room,
459 state: RwLock::new(state),
460 sender,
461 pagination_batch_token_notifier: Default::default(),
462 auto_shrink_sender,
463 pagination_status,
464 generic_update_sender,
465 }
466 }
467
468 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
469 if account_data.is_empty() {
470 return;
471 }
472
473 let mut handled_read_marker = false;
474
475 trace!("Handling account data");
476
477 for raw_event in account_data {
478 match raw_event.deserialize() {
479 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
480 if handled_read_marker {
483 continue;
484 }
485
486 handled_read_marker = true;
487
488 let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
490 event_id: ev.content.event_id,
491 });
492 }
493
494 Ok(_) => {
495 }
498
499 Err(e) => {
500 let event_type = raw_event.get_field::<String>("type").ok().flatten();
501 warn!(event_type, "Failed to deserialize account data: {e}");
502 }
503 }
504 }
505 }
506
507 #[instrument(skip_all, fields(room_id = %self.room_id))]
508 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
509 self.handle_timeline(
510 updates.timeline,
511 updates.ephemeral.clone(),
512 updates.ambiguity_changes,
513 )
514 .await?;
515 self.handle_account_data(updates.account_data);
516
517 Ok(())
518 }
519
520 #[instrument(skip_all, fields(room_id = %self.room_id))]
521 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
522 self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
523
524 Ok(())
525 }
526
527 async fn handle_timeline(
530 &self,
531 timeline: Timeline,
532 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
533 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
534 ) -> Result<()> {
535 if timeline.events.is_empty()
536 && timeline.prev_batch.is_none()
537 && ephemeral_events.is_empty()
538 && ambiguity_changes.is_empty()
539 {
540 return Ok(());
541 }
542
543 trace!("adding new events");
545
546 let (stored_prev_batch_token, timeline_event_diffs) =
547 self.state.write().await.handle_sync(timeline).await?;
548
549 if stored_prev_batch_token {
552 self.pagination_batch_token_notifier.notify_one();
553 }
554
555 if !timeline_event_diffs.is_empty() {
558 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
559 diffs: timeline_event_diffs,
560 origin: EventsOrigin::Sync,
561 });
562
563 let _ = self
564 .generic_update_sender
565 .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
566 }
567
568 if !ephemeral_events.is_empty() {
569 let _ = self
570 .sender
571 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
572 }
573
574 if !ambiguity_changes.is_empty() {
575 let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
576 }
577
578 Ok(())
579 }
580}
581
582#[derive(Debug)]
585pub(super) enum LoadMoreEventsBackwardsOutcome {
586 Gap {
588 prev_token: Option<String>,
591 },
592
593 StartOfTimeline,
595
596 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
598
599 WaitForInitialPrevToken,
601}
602
603mod private {
605 use std::{
606 collections::{BTreeMap, HashMap, HashSet},
607 sync::{Arc, atomic::AtomicUsize},
608 };
609
610 use eyeball::SharedObservable;
611 use eyeball_im::VectorDiff;
612 use matrix_sdk_base::{
613 apply_redaction,
614 deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
615 event_cache::{
616 Event, Gap,
617 store::{DynEventCacheStore, EventCacheStoreLock},
618 },
619 linked_chunk::{
620 ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
621 OwnedLinkedChunkId, Position, Update,
622 lazy_loader::{self},
623 },
624 serde_helpers::extract_thread_root,
625 sync::Timeline,
626 };
627 use matrix_sdk_common::executor::spawn;
628 use ruma::{
629 EventId, OwnedEventId, OwnedRoomId,
630 events::{
631 AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
632 relation::RelationType, room::redaction::SyncRoomRedactionEvent,
633 },
634 room_version_rules::RoomVersionRules,
635 serde::Raw,
636 };
637 use tokio::sync::broadcast::{Receiver, Sender};
638 use tracing::{debug, error, instrument, trace, warn};
639
640 use super::{
641 super::{EventCacheError, deduplicator::DeduplicationOutcome},
642 EventLocation, LoadMoreEventsBackwardsOutcome,
643 events::EventLinkedChunk,
644 sort_positions_descending,
645 };
646 use crate::event_cache::{
647 BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
648 ThreadEventCacheUpdate, deduplicator::filter_duplicate_events,
649 room::threads::ThreadEventCache,
650 };
651
652 pub struct RoomEventCacheState {
657 room: OwnedRoomId,
659
660 room_version_rules: RoomVersionRules,
662
663 enabled_thread_support: bool,
665
666 store: EventCacheStoreLock,
668
669 room_linked_chunk: EventLinkedChunk,
672
673 threads: HashMap<OwnedEventId, ThreadEventCache>,
677
678 pub waited_for_initial_prev_token: bool,
683
684 pagination_status: SharedObservable<RoomPaginationStatus>,
685
686 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
689
690 pub(super) subscriber_count: Arc<AtomicUsize>,
693 }
694
695 impl RoomEventCacheState {
696 pub async fn new(
706 room_id: OwnedRoomId,
707 room_version_rules: RoomVersionRules,
708 enabled_thread_support: bool,
709 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
710 store: EventCacheStoreLock,
711 pagination_status: SharedObservable<RoomPaginationStatus>,
712 ) -> Result<Self, EventCacheError> {
713 let store_lock = store.lock().await?;
714
715 let linked_chunk_id = LinkedChunkId::Room(&room_id);
716
717 let full_linked_chunk_metadata =
722 match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await {
723 Ok(metas) => metas,
724 Err(err) => {
725 error!(
726 "error when loading a linked chunk's metadata from the store: {err}"
727 );
728
729 store_lock
731 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
732 .await?;
733
734 None
736 }
737 };
738
739 let linked_chunk = match store_lock
740 .load_last_chunk(linked_chunk_id)
741 .await
742 .map_err(EventCacheError::from)
743 .and_then(|(last_chunk, chunk_identifier_generator)| {
744 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
745 .map_err(EventCacheError::from)
746 }) {
747 Ok(linked_chunk) => linked_chunk,
748 Err(err) => {
749 error!(
750 "error when loading a linked chunk's latest chunk from the store: {err}"
751 );
752
753 store_lock
755 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
756 .await?;
757
758 None
759 }
760 };
761
762 let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk(
763 linked_chunk,
764 full_linked_chunk_metadata,
765 );
766
767 let threads = HashMap::new();
772
773 Ok(Self {
774 room: room_id,
775 room_version_rules,
776 enabled_thread_support,
777 store,
778 room_linked_chunk,
779 threads,
780 waited_for_initial_prev_token: false,
781 subscriber_count: Default::default(),
782 pagination_status,
783 linked_chunk_update_sender,
784 })
785 }
786
787 async fn load_linked_chunk_metadata(
793 store: &DynEventCacheStore,
794 linked_chunk_id: LinkedChunkId<'_>,
795 ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
796 let mut all_chunks = store
797 .load_all_chunks_metadata(linked_chunk_id)
798 .await
799 .map_err(EventCacheError::from)?;
800
801 if all_chunks.is_empty() {
802 return Ok(None);
804 }
805
806 let chunk_map: HashMap<_, _> =
808 all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
809
810 let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
812 let Some(last) = iter.next() else {
813 return Err(EventCacheError::InvalidLinkedChunkMetadata {
814 details: "no last chunk found".to_owned(),
815 });
816 };
817
818 if let Some(other_last) = iter.next() {
820 return Err(EventCacheError::InvalidLinkedChunkMetadata {
821 details: format!(
822 "chunks {} and {} both claim to be last chunks",
823 last.identifier.index(),
824 other_last.identifier.index()
825 ),
826 });
827 }
828
829 let mut seen = HashSet::new();
832 let mut current = last;
833 loop {
834 if !seen.insert(current.identifier) {
836 return Err(EventCacheError::InvalidLinkedChunkMetadata {
837 details: format!(
838 "cycle detected in linked chunk at {}",
839 current.identifier.index()
840 ),
841 });
842 }
843
844 let Some(prev_id) = current.previous else {
845 if seen.len() != all_chunks.len() {
847 return Err(EventCacheError::InvalidLinkedChunkMetadata {
848 details: format!(
849 "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
850 seen.len(),
851 all_chunks.len()
852 ),
853 });
854 }
855 break;
856 };
857
858 let Some(pred_meta) = chunk_map.get(&prev_id) else {
861 return Err(EventCacheError::InvalidLinkedChunkMetadata {
862 details: format!(
863 "missing predecessor {} chunk for {}",
864 prev_id.index(),
865 current.identifier.index()
866 ),
867 });
868 };
869
870 if pred_meta.next != Some(current.identifier) {
872 return Err(EventCacheError::InvalidLinkedChunkMetadata {
873 details: format!(
874 "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
875 pred_meta.identifier.index(),
876 pred_meta.next.map(|chunk_id| chunk_id.index()),
877 current.identifier.index()
878 ),
879 });
880 }
881
882 current = *pred_meta;
883 }
884
885 let mut current = current.identifier;
893 for i in 0..all_chunks.len() {
894 let j = all_chunks
896 .iter()
897 .rev()
898 .position(|meta| meta.identifier == current)
899 .map(|j| all_chunks.len() - 1 - j)
900 .expect("the target chunk must be present in the metadata");
901 if i != j {
902 all_chunks.swap(i, j);
903 }
904 if let Some(next) = all_chunks[i].next {
905 current = next;
906 }
907 }
908
909 Ok(Some(all_chunks))
910 }
911
912 fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
915 if self.room_linked_chunk.events().next().is_some() {
920 trace!("chunk is fully loaded and non-empty: reached_start=true");
923 LoadMoreEventsBackwardsOutcome::StartOfTimeline
924 } else if !self.waited_for_initial_prev_token {
925 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
927 } else {
928 LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
932 }
933 }
934
935 pub(in super::super) async fn load_more_events_backwards(
937 &mut self,
938 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
939 if let Some(prev_token) = self.room_linked_chunk.rgap().map(|gap| gap.prev_token) {
942 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
943 }
944
945 let first_chunk_identifier = self
948 .room_linked_chunk
949 .chunks()
950 .next()
951 .expect("a linked chunk is never empty")
952 .identifier();
953
954 let store = self.store.lock().await?;
955
956 let linked_chunk_id = LinkedChunkId::Room(&self.room);
958 let new_first_chunk = match store
959 .load_previous_chunk(linked_chunk_id, first_chunk_identifier)
960 .await
961 {
962 Ok(Some(new_first_chunk)) => {
963 new_first_chunk
965 }
966
967 Ok(None) => {
968 return Ok(self.conclude_load_more_for_fully_loaded_chunk());
970 }
971
972 Err(err) => {
973 error!("error when loading the previous chunk of a linked chunk: {err}");
974
975 store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
977
978 return Err(err.into());
980 }
981 };
982
983 let chunk_content = new_first_chunk.content.clone();
984
985 let reached_start = new_first_chunk.previous.is_none();
991
992 if let Err(err) = self.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk) {
993 error!("error when inserting the previous chunk into its linked chunk: {err}");
994
995 store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
997
998 return Err(err.into());
1000 }
1001
1002 let _ = self.room_linked_chunk.store_updates().take();
1005
1006 let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1008
1009 Ok(match chunk_content {
1010 ChunkContent::Gap(gap) => {
1011 trace!("reloaded chunk from disk (gap)");
1012 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1013 }
1014
1015 ChunkContent::Items(events) => {
1016 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1017 LoadMoreEventsBackwardsOutcome::Events {
1018 events,
1019 timeline_event_diffs,
1020 reached_start,
1021 }
1022 }
1023 })
1024 }
1025
1026 pub(super) async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1035 let store_lock = self.store.lock().await?;
1036
1037 let linked_chunk_id = LinkedChunkId::Room(&self.room);
1039 let (last_chunk, chunk_identifier_generator) =
1040 match store_lock.load_last_chunk(linked_chunk_id).await {
1041 Ok(pair) => pair,
1042
1043 Err(err) => {
1044 error!("error when reloading a linked chunk from memory: {err}");
1046
1047 store_lock
1049 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1050 .await?;
1051
1052 (None, ChunkIdentifierGenerator::new_from_scratch())
1054 }
1055 };
1056
1057 debug!("unloading the linked chunk, and resetting it to its last chunk");
1058
1059 if let Err(err) =
1062 self.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1063 {
1064 error!("error when replacing the linked chunk: {err}");
1065 return self.reset_internal().await;
1066 }
1067
1068 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1072
1073 let _ = self.room_linked_chunk.store_updates().take();
1076
1077 Ok(())
1078 }
1079
1080 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1083 pub(crate) async fn auto_shrink_if_no_subscribers(
1084 &mut self,
1085 ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1086 let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst);
1087
1088 trace!(subscriber_count, "received request to auto-shrink");
1089
1090 if subscriber_count == 0 {
1091 self.shrink_to_last_chunk().await?;
1094 Ok(Some(self.room_linked_chunk.updates_as_vector_diffs()))
1095 } else {
1096 Ok(None)
1097 }
1098 }
1099
1100 #[cfg(test)]
1101 pub(crate) async fn force_shrink_to_last_chunk(
1102 &mut self,
1103 ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1104 self.shrink_to_last_chunk().await?;
1105 Ok(self.room_linked_chunk.updates_as_vector_diffs())
1106 }
1107
1108 pub(crate) fn room_event_order(&self, event_pos: Position) -> Option<usize> {
1109 self.room_linked_chunk.event_order(event_pos)
1110 }
1111
1112 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1116 let mut closure = || -> Option<()> {
1120 let mut val: serde_json::Value = event.deserialize_as().ok()?;
1121 let unsigned = val.get_mut("unsigned")?;
1122 let unsigned_obj = unsigned.as_object_mut()?;
1123 if unsigned_obj.remove("m.relations").is_some() {
1124 *event = Raw::new(&val).ok()?.cast_unchecked();
1125 }
1126 None
1127 };
1128 let _ = closure();
1129 }
1130
1131 fn strip_relations_from_event(ev: &mut Event) {
1132 match &mut ev.kind {
1133 TimelineEventKind::Decrypted(decrypted) => {
1134 decrypted.unsigned_encryption_info = None;
1137
1138 Self::strip_relations_if_present(&mut decrypted.event);
1140 }
1141
1142 TimelineEventKind::UnableToDecrypt { event, .. }
1143 | TimelineEventKind::PlainText { event } => {
1144 Self::strip_relations_if_present(event);
1145 }
1146 }
1147 }
1148
1149 fn strip_relations_from_events(items: &mut [Event]) {
1151 for ev in items.iter_mut() {
1152 Self::strip_relations_from_event(ev);
1153 }
1154 }
1155
1156 #[instrument(skip_all)]
1162 async fn remove_events(
1163 &mut self,
1164 in_memory_events: Vec<(OwnedEventId, Position)>,
1165 in_store_events: Vec<(OwnedEventId, Position)>,
1166 ) -> Result<(), EventCacheError> {
1167 if !in_store_events.is_empty() {
1169 let mut positions = in_store_events
1170 .into_iter()
1171 .map(|(_event_id, position)| position)
1172 .collect::<Vec<_>>();
1173
1174 sort_positions_descending(&mut positions);
1175
1176 let updates = positions
1177 .into_iter()
1178 .map(|pos| Update::RemoveItem { at: pos })
1179 .collect::<Vec<_>>();
1180
1181 self.apply_store_only_updates(updates).await?;
1182 }
1183
1184 if in_memory_events.is_empty() {
1186 return Ok(());
1188 }
1189
1190 self.room_linked_chunk
1192 .remove_events_by_position(
1193 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1194 )
1195 .expect("failed to remove an event");
1196
1197 self.propagate_changes().await
1198 }
1199
1200 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1202 let updates = self.room_linked_chunk.store_updates().take();
1203 self.send_updates_to_store(updates).await
1204 }
1205
1206 async fn apply_store_only_updates(
1213 &mut self,
1214 updates: Vec<Update<Event, Gap>>,
1215 ) -> Result<(), EventCacheError> {
1216 self.room_linked_chunk.order_tracker.map_updates(&updates);
1217 self.send_updates_to_store(updates).await
1218 }
1219
1220 async fn send_updates_to_store(
1221 &mut self,
1222 mut updates: Vec<Update<Event, Gap>>,
1223 ) -> Result<(), EventCacheError> {
1224 if updates.is_empty() {
1225 return Ok(());
1226 }
1227
1228 for update in updates.iter_mut() {
1230 match update {
1231 Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1232 Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1233 Update::NewItemsChunk { .. }
1235 | Update::NewGapChunk { .. }
1236 | Update::RemoveChunk(_)
1237 | Update::RemoveItem { .. }
1238 | Update::DetachLastItems { .. }
1239 | Update::StartReattachItems
1240 | Update::EndReattachItems
1241 | Update::Clear => {}
1242 }
1243 }
1244
1245 let store = self.store.clone();
1252 let room_id = self.room.clone();
1253 let cloned_updates = updates.clone();
1254
1255 spawn(async move {
1256 let store = store.lock().await?;
1257
1258 trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1259 let linked_chunk_id = LinkedChunkId::Room(&room_id);
1260 store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1261 trace!("linked chunk updates applied");
1262
1263 super::Result::Ok(())
1264 })
1265 .await
1266 .expect("joining failed")?;
1267
1268 let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1270 linked_chunk_id: OwnedLinkedChunkId::Room(self.room.clone()),
1271 updates,
1272 });
1273
1274 Ok(())
1275 }
1276
1277 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1283 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1284 self.reset_internal().await?;
1285
1286 let diff_updates = self.room_linked_chunk.updates_as_vector_diffs();
1287
1288 debug_assert_eq!(diff_updates.len(), 1);
1290 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1291
1292 Ok(diff_updates)
1293 }
1294
1295 async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1296 self.room_linked_chunk.reset();
1297
1298 for thread in self.threads.values_mut() {
1304 thread.clear();
1305 }
1306
1307 self.propagate_changes().await?;
1308
1309 self.waited_for_initial_prev_token = false;
1313 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1315
1316 Ok(())
1317 }
1318
1319 pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1321 &self.room_linked_chunk
1322 }
1323
1324 pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1330 where
1331 P: FnMut(&Event) -> Option<O>,
1332 {
1333 self.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event))
1334 }
1335
1336 pub async fn find_event(
1341 &self,
1342 event_id: &EventId,
1343 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1344 for (position, event) in self.room_linked_chunk.revents() {
1347 if event.event_id().as_deref() == Some(event_id) {
1348 return Ok(Some((EventLocation::Memory(position), event.clone())));
1349 }
1350 }
1351
1352 let store = self.store.lock().await?;
1353
1354 Ok(store
1355 .find_event(&self.room, event_id)
1356 .await?
1357 .map(|event| (EventLocation::Store, event)))
1358 }
1359
1360 pub async fn find_event_with_relations(
1374 &self,
1375 event_id: &EventId,
1376 filters: Option<Vec<RelationType>>,
1377 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1378 let store = self.store.lock().await?;
1379
1380 let found = store.find_event(&self.room, event_id).await?;
1382
1383 let Some(target) = found else {
1384 return Ok(None);
1386 };
1387
1388 let mut related =
1391 store.find_event_relations(&self.room, event_id, filters.as_deref()).await?;
1392 let mut stack =
1393 related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
1394
1395 let mut already_seen = HashSet::new();
1398 already_seen.insert(event_id.to_owned());
1399
1400 let mut num_iters = 1;
1401
1402 while let Some(event_id) = stack.pop() {
1404 if !already_seen.insert(event_id.clone()) {
1405 continue;
1407 }
1408
1409 let other_related =
1410 store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?;
1411
1412 stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
1413 related.extend(other_related);
1414
1415 num_iters += 1;
1416 }
1417
1418 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
1419
1420 related.sort_by(|(_, lhs), (_, rhs)| {
1424 use std::cmp::Ordering;
1425 match (lhs, rhs) {
1426 (None, None) => Ordering::Equal,
1427 (None, Some(_)) => Ordering::Less,
1428 (Some(_), None) => Ordering::Greater,
1429 (Some(lhs), Some(rhs)) => {
1430 let lhs = self.room_event_order(*lhs);
1431 let rhs = self.room_event_order(*rhs);
1432
1433 match (lhs, rhs) {
1437 (None, None) => Ordering::Equal,
1438 (None, Some(_)) => Ordering::Less,
1439 (Some(_), None) => Ordering::Greater,
1440 (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
1441 }
1442 }
1443 }
1444 });
1445
1446 let related = related.into_iter().map(|(event, _pos)| event).collect();
1448
1449 Ok(Some((target, related)))
1450 }
1451
1452 async fn post_process_new_events(
1457 &mut self,
1458 events: Vec<Event>,
1459 is_sync: bool,
1460 ) -> Result<(), EventCacheError> {
1461 self.propagate_changes().await?;
1463
1464 let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1465
1466 for event in events {
1467 self.maybe_apply_new_redaction(&event).await?;
1468
1469 if self.enabled_thread_support {
1470 if let Some(thread_root) = extract_thread_root(event.raw()) {
1471 new_events_by_thread.entry(thread_root).or_default().push(event.clone());
1472 } else if let Some(event_id) = event.event_id() {
1473 if self.threads.contains_key(&event_id) {
1475 new_events_by_thread.entry(event_id).or_default().push(event.clone());
1476 }
1477 }
1478 }
1479
1480 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1482 self.save_event([*bundled_thread]).await?;
1483 }
1484 }
1485
1486 self.update_threads(new_events_by_thread, is_sync).await?;
1487
1488 Ok(())
1489 }
1490
1491 fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1492 self.threads.entry(root_event_id.clone()).or_insert_with(|| {
1495 ThreadEventCache::new(
1496 self.room.clone(),
1497 root_event_id,
1498 self.linked_chunk_update_sender.clone(),
1499 )
1500 })
1501 }
1502
1503 #[instrument(skip_all)]
1504 async fn update_threads(
1505 &mut self,
1506 new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1507 is_sync: bool,
1508 ) -> Result<(), EventCacheError> {
1509 for (thread_root, new_events) in new_events_by_thread {
1510 let thread_cache = self.get_or_reload_thread(thread_root.clone());
1511
1512 if is_sync {
1517 thread_cache.add_live_events(new_events);
1518 }
1519
1520 let last_event_id = thread_cache.latest_event_id();
1524
1525 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1526 else {
1527 trace!(%thread_root, "thread root event is missing from the linked chunk");
1528 continue;
1529 };
1530
1531 let prev_summary = target_event.thread_summary.summary();
1532 let mut latest_reply =
1533 prev_summary.as_ref().and_then(|summary| summary.latest_reply.clone());
1534
1535 let num_replies = {
1544 let store_guard = &*self.store.lock().await?;
1545 let related_thread_events = store_guard
1546 .find_event_relations(
1547 &self.room,
1548 &thread_root,
1549 Some(&[RelationType::Thread]),
1550 )
1551 .await?;
1552 related_thread_events.len().try_into().unwrap_or(u32::MAX)
1553 };
1554
1555 if let Some(last_event_id) = last_event_id {
1556 latest_reply = Some(last_event_id);
1557 }
1558
1559 let new_summary = ThreadSummary { num_replies, latest_reply };
1560
1561 if prev_summary == Some(&new_summary) {
1562 trace!(%thread_root, "thread summary is already up-to-date");
1563 continue;
1564 }
1565
1566 target_event.thread_summary = ThreadSummaryStatus::Some(new_summary);
1568 self.replace_event_at(location, target_event).await?;
1569 }
1570
1571 Ok(())
1572 }
1573
1574 async fn replace_event_at(
1581 &mut self,
1582 location: EventLocation,
1583 event: Event,
1584 ) -> Result<(), EventCacheError> {
1585 match location {
1586 EventLocation::Memory(position) => {
1587 self.room_linked_chunk
1588 .replace_event_at(position, event)
1589 .expect("should have been a valid position of an item");
1590 self.propagate_changes().await?;
1593 }
1594 EventLocation::Store => {
1595 self.save_event([event]).await?;
1596 }
1597 }
1598
1599 Ok(())
1600 }
1601
1602 #[instrument(skip_all)]
1606 async fn maybe_apply_new_redaction(
1607 &mut self,
1608 event: &Event,
1609 ) -> Result<(), EventCacheError> {
1610 let raw_event = event.raw();
1611
1612 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1615 raw_event.get_field::<MessageLikeEventType>("type")
1616 else {
1617 return Ok(());
1618 };
1619
1620 let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1623 redaction,
1624 ))) = raw_event.deserialize()
1625 else {
1626 return Ok(());
1627 };
1628
1629 let Some(event_id) = redaction.redacts(&self.room_version_rules.redaction) else {
1630 warn!("missing target event id from the redaction event");
1631 return Ok(());
1632 };
1633
1634 let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1636 trace!("redacted event is missing from the linked chunk");
1637 return Ok(());
1638 };
1639
1640 if let Ok(deserialized) = target_event.raw().deserialize() {
1642 match deserialized {
1643 AnySyncTimelineEvent::MessageLike(ev) => {
1644 if ev.is_redacted() {
1645 return Ok(());
1646 }
1647 }
1648 AnySyncTimelineEvent::State(ev) => {
1649 if ev.is_redacted() {
1650 return Ok(());
1651 }
1652 }
1653 }
1654 }
1655
1656 if let Some(redacted_event) = apply_redaction(
1657 target_event.raw(),
1658 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
1659 &self.room_version_rules.redaction,
1660 ) {
1661 target_event.replace_raw(redacted_event.cast_unchecked());
1666
1667 self.replace_event_at(location, target_event).await?;
1668 }
1669
1670 Ok(())
1671 }
1672
1673 pub async fn save_event(
1680 &self,
1681 events: impl IntoIterator<Item = Event>,
1682 ) -> Result<(), EventCacheError> {
1683 let store = self.store.clone();
1684 let room_id = self.room.clone();
1685 let events = events.into_iter().collect::<Vec<_>>();
1686
1687 spawn(async move {
1689 let store = store.lock().await?;
1690 for event in events {
1691 store.save_event(&room_id, event).await?;
1692 }
1693 super::Result::Ok(())
1694 })
1695 .await
1696 .expect("joining failed")?;
1697
1698 Ok(())
1699 }
1700
1701 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1709 pub async fn handle_sync(
1710 &mut self,
1711 mut timeline: Timeline,
1712 ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1713 let mut prev_batch = timeline.prev_batch.take();
1714
1715 let DeduplicationOutcome {
1716 all_events: events,
1717 in_memory_duplicated_event_ids,
1718 in_store_duplicated_event_ids,
1719 non_empty_all_duplicates: all_duplicates,
1720 } = filter_duplicate_events(
1721 &self.store,
1722 LinkedChunkId::Room(self.room.as_ref()),
1723 &self.room_linked_chunk,
1724 timeline.events,
1725 )
1726 .await?;
1727
1728 if !timeline.limited && self.room_linked_chunk.events().next().is_some()
1741 || all_duplicates
1742 {
1743 prev_batch = None;
1744 }
1745
1746 if prev_batch.is_some() {
1747 let mut summaries_to_update = Vec::new();
1752
1753 for (thread_root, thread) in self.threads.iter_mut() {
1754 thread.clear();
1756
1757 summaries_to_update.push(thread_root.clone());
1758 }
1759
1760 for thread_root in summaries_to_update {
1764 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1765 else {
1766 trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1767 continue;
1768 };
1769
1770 if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1771 prev_summary.latest_reply = None;
1772
1773 target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1774
1775 self.replace_event_at(location, target_event).await?;
1776 }
1777 }
1778 }
1779
1780 if all_duplicates {
1781 return Ok((false, Vec::new()));
1784 }
1785
1786 let has_new_gap = prev_batch.is_some();
1787
1788 if !self.waited_for_initial_prev_token && has_new_gap {
1791 self.waited_for_initial_prev_token = true;
1792 }
1793
1794 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1799 .await?;
1800
1801 self.room_linked_chunk
1802 .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1803
1804 self.post_process_new_events(events, true).await?;
1805
1806 if timeline.limited && has_new_gap {
1807 self.shrink_to_last_chunk().await?;
1814 }
1815
1816 let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1817
1818 Ok((has_new_gap, timeline_event_diffs))
1819 }
1820
1821 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1829 pub async fn handle_backpagination(
1830 &mut self,
1831 events: Vec<Event>,
1832 mut new_token: Option<String>,
1833 prev_token: Option<String>,
1834 ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1835 {
1836 let prev_gap_id = if let Some(token) = prev_token {
1839 let gap_chunk_id = self.room_linked_chunk.chunk_identifier(|chunk| {
1841 matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1842 });
1843
1844 if gap_chunk_id.is_none() {
1845 return Ok(None);
1851 }
1852
1853 gap_chunk_id
1854 } else {
1855 None
1856 };
1857
1858 let DeduplicationOutcome {
1859 all_events: mut events,
1860 in_memory_duplicated_event_ids,
1861 in_store_duplicated_event_ids,
1862 non_empty_all_duplicates: all_duplicates,
1863 } = filter_duplicate_events(
1864 &self.store,
1865 LinkedChunkId::Room(self.room.as_ref()),
1866 &self.room_linked_chunk,
1867 events,
1868 )
1869 .await?;
1870
1871 if !all_duplicates {
1885 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1887 .await?;
1888 } else {
1889 events.clear();
1891 new_token = None;
1894 }
1895
1896 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1899
1900 let new_gap = new_token.map(|prev_token| Gap { prev_token });
1901 let reached_start = self.room_linked_chunk.finish_back_pagination(
1902 prev_gap_id,
1903 new_gap,
1904 &topo_ordered_events,
1905 );
1906
1907 self.post_process_new_events(topo_ordered_events, false).await?;
1909
1910 let event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1911
1912 Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1913 }
1914
1915 pub fn subscribe_to_thread(
1918 &mut self,
1919 root: OwnedEventId,
1920 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1921 self.get_or_reload_thread(root).subscribe()
1922 }
1923
1924 pub fn finish_thread_network_pagination(
1928 &mut self,
1929 root: OwnedEventId,
1930 prev_token: Option<String>,
1931 new_token: Option<String>,
1932 events: Vec<Event>,
1933 ) -> Option<BackPaginationOutcome> {
1934 self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1935 }
1936
1937 pub fn load_more_thread_events_backwards(
1938 &mut self,
1939 root: OwnedEventId,
1940 ) -> LoadMoreEventsBackwardsOutcome {
1941 self.get_or_reload_thread(root).load_more_events_backwards()
1942 }
1943 }
1944}
1945
1946pub(super) enum EventLocation {
1948 Memory(Position),
1950
1951 Store,
1953}
1954
1955pub(super) use private::RoomEventCacheState;
1956
1957#[cfg(test)]
1958mod tests {
1959 use matrix_sdk_base::event_cache::Event;
1960 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1961 use ruma::{
1962 RoomId, event_id,
1963 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1964 room_id, user_id,
1965 };
1966
1967 use crate::test_utils::logged_in_client;
1968
1969 #[async_test]
1970 async fn test_find_event_by_id_with_edit_relation() {
1971 let original_id = event_id!("$original");
1972 let related_id = event_id!("$related");
1973 let room_id = room_id!("!galette:saucisse.bzh");
1974 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1975
1976 assert_relations(
1977 room_id,
1978 f.text_msg("Original event").event_id(original_id).into(),
1979 f.text_msg("* An edited event")
1980 .edit(
1981 original_id,
1982 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1983 )
1984 .event_id(related_id)
1985 .into(),
1986 f,
1987 )
1988 .await;
1989 }
1990
1991 #[async_test]
1992 async fn test_find_event_by_id_with_thread_reply_relation() {
1993 let original_id = event_id!("$original");
1994 let related_id = event_id!("$related");
1995 let room_id = room_id!("!galette:saucisse.bzh");
1996 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1997
1998 assert_relations(
1999 room_id,
2000 f.text_msg("Original event").event_id(original_id).into(),
2001 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2002 f,
2003 )
2004 .await;
2005 }
2006
2007 #[async_test]
2008 async fn test_find_event_by_id_with_reaction_relation() {
2009 let original_id = event_id!("$original");
2010 let related_id = event_id!("$related");
2011 let room_id = room_id!("!galette:saucisse.bzh");
2012 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2013
2014 assert_relations(
2015 room_id,
2016 f.text_msg("Original event").event_id(original_id).into(),
2017 f.reaction(original_id, ":D").event_id(related_id).into(),
2018 f,
2019 )
2020 .await;
2021 }
2022
2023 #[async_test]
2024 async fn test_find_event_by_id_with_poll_response_relation() {
2025 let original_id = event_id!("$original");
2026 let related_id = event_id!("$related");
2027 let room_id = room_id!("!galette:saucisse.bzh");
2028 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2029
2030 assert_relations(
2031 room_id,
2032 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2033 .event_id(original_id)
2034 .into(),
2035 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2036 f,
2037 )
2038 .await;
2039 }
2040
2041 #[async_test]
2042 async fn test_find_event_by_id_with_poll_end_relation() {
2043 let original_id = event_id!("$original");
2044 let related_id = event_id!("$related");
2045 let room_id = room_id!("!galette:saucisse.bzh");
2046 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2047
2048 assert_relations(
2049 room_id,
2050 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2051 .event_id(original_id)
2052 .into(),
2053 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2054 f,
2055 )
2056 .await;
2057 }
2058
2059 #[async_test]
2060 async fn test_find_event_by_id_with_filtered_relationships() {
2061 let original_id = event_id!("$original");
2062 let related_id = event_id!("$related");
2063 let associated_related_id = event_id!("$recursive_related");
2064 let room_id = room_id!("!galette:saucisse.bzh");
2065 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2066
2067 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2068 let related_event = event_factory
2069 .text_msg("* Edited event")
2070 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2071 .event_id(related_id)
2072 .into();
2073 let associated_related_event =
2074 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2075
2076 let client = logged_in_client(None).await;
2077
2078 let event_cache = client.event_cache();
2079 event_cache.subscribe().unwrap();
2080
2081 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2082 let room = client.get_room(room_id).unwrap();
2083
2084 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2085
2086 room_event_cache.save_events([original_event]).await;
2088
2089 room_event_cache.save_events([related_event]).await;
2091
2092 room_event_cache.save_events([associated_related_event]).await;
2094
2095 let filter = Some(vec![RelationType::Replacement]);
2096 let (event, related_events) =
2097 room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2098 let cached_event_id = event.event_id().unwrap();
2100 assert_eq!(cached_event_id, original_id);
2101
2102 assert_eq!(related_events.len(), 1);
2104
2105 let related_event_id = related_events[0].event_id().unwrap();
2106 assert_eq!(related_event_id, related_id);
2107
2108 let filter = Some(vec![RelationType::Thread]);
2110 let (event, related_events) =
2111 room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2112 let cached_event_id = event.event_id().unwrap();
2114 assert_eq!(cached_event_id, original_id);
2115 assert!(related_events.is_empty());
2117 }
2118
2119 #[async_test]
2120 async fn test_find_event_by_id_with_recursive_relation() {
2121 let original_id = event_id!("$original");
2122 let related_id = event_id!("$related");
2123 let associated_related_id = event_id!("$recursive_related");
2124 let room_id = room_id!("!galette:saucisse.bzh");
2125 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2126
2127 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2128 let related_event = event_factory
2129 .text_msg("* Edited event")
2130 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2131 .event_id(related_id)
2132 .into();
2133 let associated_related_event =
2134 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2135
2136 let client = logged_in_client(None).await;
2137
2138 let event_cache = client.event_cache();
2139 event_cache.subscribe().unwrap();
2140
2141 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2142 let room = client.get_room(room_id).unwrap();
2143
2144 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2145
2146 room_event_cache.save_events([original_event]).await;
2148
2149 room_event_cache.save_events([related_event]).await;
2151
2152 room_event_cache.save_events([associated_related_event]).await;
2154
2155 let (event, related_events) =
2156 room_event_cache.find_event_with_relations(original_id, None).await.unwrap();
2157 let cached_event_id = event.event_id().unwrap();
2159 assert_eq!(cached_event_id, original_id);
2160
2161 assert_eq!(related_events.len(), 2);
2163
2164 let related_event_id = related_events[0].event_id().unwrap();
2165 assert_eq!(related_event_id, related_id);
2166 let related_event_id = related_events[1].event_id().unwrap();
2167 assert_eq!(related_event_id, associated_related_id);
2168 }
2169
2170 async fn assert_relations(
2171 room_id: &RoomId,
2172 original_event: Event,
2173 related_event: Event,
2174 event_factory: EventFactory,
2175 ) {
2176 let client = logged_in_client(None).await;
2177
2178 let event_cache = client.event_cache();
2179 event_cache.subscribe().unwrap();
2180
2181 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2182 let room = client.get_room(room_id).unwrap();
2183
2184 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2185
2186 let original_event_id = original_event.event_id().unwrap();
2188 room_event_cache.save_events([original_event]).await;
2189
2190 let unrelated_id = event_id!("$2");
2192 room_event_cache
2193 .save_events([event_factory
2194 .text_msg("An unrelated event")
2195 .event_id(unrelated_id)
2196 .into()])
2197 .await;
2198
2199 let related_id = related_event.event_id().unwrap();
2201 room_event_cache.save_events([related_event]).await;
2202
2203 let (event, related_events) =
2204 room_event_cache.find_event_with_relations(&original_event_id, None).await.unwrap();
2205 let cached_event_id = event.event_id().unwrap();
2207 assert_eq!(cached_event_id, original_event_id);
2208
2209 let related_event_id = related_events[0].event_id().unwrap();
2211 assert_eq!(related_event_id, related_id);
2212 }
2213}
2214
2215#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
2217 use std::sync::Arc;
2218
2219 use assert_matches::assert_matches;
2220 use assert_matches2::assert_let;
2221 use eyeball_im::VectorDiff;
2222 use futures_util::FutureExt;
2223 use matrix_sdk_base::{
2224 event_cache::{
2225 Gap,
2226 store::{EventCacheStore as _, MemoryStore},
2227 },
2228 linked_chunk::{
2229 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2230 lazy_loader::from_all_chunks,
2231 },
2232 store::StoreConfig,
2233 sync::{JoinedRoomUpdate, Timeline},
2234 };
2235 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2236 use ruma::{
2237 OwnedUserId, event_id,
2238 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2239 room_id, user_id,
2240 };
2241 use tokio::task::yield_now;
2242
2243 use super::RoomEventCacheGenericUpdate;
2244 use crate::{
2245 assert_let_timeout,
2246 event_cache::{RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2247 test_utils::client::MockClientBuilder,
2248 };
2249
2250 #[async_test]
2251 async fn test_write_to_storage() {
2252 let room_id = room_id!("!galette:saucisse.bzh");
2253 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2254
2255 let event_cache_store = Arc::new(MemoryStore::new());
2256
2257 let client = MockClientBuilder::new(None)
2258 .on_builder(|builder| {
2259 builder.store_config(
2260 StoreConfig::new("hodlor".to_owned())
2261 .event_cache_store(event_cache_store.clone()),
2262 )
2263 })
2264 .build()
2265 .await;
2266
2267 let event_cache = client.event_cache();
2268
2269 event_cache.subscribe().unwrap();
2271
2272 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2273 let room = client.get_room(room_id).unwrap();
2274
2275 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2276 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2277
2278 let timeline = Timeline {
2280 limited: true,
2281 prev_batch: Some("raclette".to_owned()),
2282 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2283 };
2284
2285 room_event_cache
2286 .inner
2287 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2288 .await
2289 .unwrap();
2290
2291 assert_matches!(
2293 generic_stream.recv().await,
2294 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2295 assert_eq!(expected_room_id, room_id);
2296 }
2297 );
2298
2299 let linked_chunk = from_all_chunks::<3, _, _>(
2301 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2302 )
2303 .unwrap()
2304 .unwrap();
2305
2306 assert_eq!(linked_chunk.chunks().count(), 2);
2307
2308 let mut chunks = linked_chunk.chunks();
2309
2310 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2312 assert_eq!(gap.prev_token, "raclette");
2313 });
2314
2315 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2317 assert_eq!(events.len(), 1);
2318 let deserialized = events[0].raw().deserialize().unwrap();
2319 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2320 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2321 });
2322
2323 assert!(chunks.next().is_none());
2325 }
2326
2327 #[async_test]
2328 async fn test_write_to_storage_strips_bundled_relations() {
2329 let room_id = room_id!("!galette:saucisse.bzh");
2330 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2331
2332 let event_cache_store = Arc::new(MemoryStore::new());
2333
2334 let client = MockClientBuilder::new(None)
2335 .on_builder(|builder| {
2336 builder.store_config(
2337 StoreConfig::new("hodlor".to_owned())
2338 .event_cache_store(event_cache_store.clone()),
2339 )
2340 })
2341 .build()
2342 .await;
2343
2344 let event_cache = client.event_cache();
2345
2346 event_cache.subscribe().unwrap();
2348
2349 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2350 let room = client.get_room(room_id).unwrap();
2351
2352 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2353 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2354
2355 let ev = f
2357 .text_msg("hey yo")
2358 .sender(*ALICE)
2359 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2360 .into_event();
2361
2362 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2363
2364 room_event_cache
2365 .inner
2366 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2367 .await
2368 .unwrap();
2369
2370 assert_matches!(
2372 generic_stream.recv().await,
2373 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2374 assert_eq!(expected_room_id, room_id);
2375 }
2376 );
2377
2378 {
2380 let events = room_event_cache.events().await;
2381
2382 assert_eq!(events.len(), 1);
2383
2384 let ev = events[0].raw().deserialize().unwrap();
2385 assert_let!(
2386 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2387 );
2388
2389 let original = msg.as_original().unwrap();
2390 assert_eq!(original.content.body(), "hey yo");
2391 assert!(original.unsigned.relations.replace.is_some());
2392 }
2393
2394 let linked_chunk = from_all_chunks::<3, _, _>(
2396 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2397 )
2398 .unwrap()
2399 .unwrap();
2400
2401 assert_eq!(linked_chunk.chunks().count(), 1);
2402
2403 let mut chunks = linked_chunk.chunks();
2404 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2405 assert_eq!(events.len(), 1);
2406
2407 let ev = events[0].raw().deserialize().unwrap();
2408 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2409
2410 let original = msg.as_original().unwrap();
2411 assert_eq!(original.content.body(), "hey yo");
2412 assert!(original.unsigned.relations.replace.is_none());
2413 });
2414
2415 assert!(chunks.next().is_none());
2417 }
2418
2419 #[async_test]
2420 async fn test_clear() {
2421 let room_id = room_id!("!galette:saucisse.bzh");
2422 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2423
2424 let event_cache_store = Arc::new(MemoryStore::new());
2425
2426 let event_id1 = event_id!("$1");
2427 let event_id2 = event_id!("$2");
2428
2429 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2430 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2431
2432 event_cache_store
2434 .handle_linked_chunk_updates(
2435 LinkedChunkId::Room(room_id),
2436 vec![
2437 Update::NewItemsChunk {
2439 previous: None,
2440 new: ChunkIdentifier::new(0),
2441 next: None,
2442 },
2443 Update::NewGapChunk {
2445 previous: Some(ChunkIdentifier::new(0)),
2446 new: ChunkIdentifier::new(42),
2448 next: None,
2449 gap: Gap { prev_token: "comté".to_owned() },
2450 },
2451 Update::NewItemsChunk {
2453 previous: Some(ChunkIdentifier::new(42)),
2454 new: ChunkIdentifier::new(1),
2455 next: None,
2456 },
2457 Update::PushItems {
2458 at: Position::new(ChunkIdentifier::new(1), 0),
2459 items: vec![ev1.clone()],
2460 },
2461 Update::NewItemsChunk {
2463 previous: Some(ChunkIdentifier::new(1)),
2464 new: ChunkIdentifier::new(2),
2465 next: None,
2466 },
2467 Update::PushItems {
2468 at: Position::new(ChunkIdentifier::new(2), 0),
2469 items: vec![ev2.clone()],
2470 },
2471 ],
2472 )
2473 .await
2474 .unwrap();
2475
2476 let client = MockClientBuilder::new(None)
2477 .on_builder(|builder| {
2478 builder.store_config(
2479 StoreConfig::new("hodlor".to_owned())
2480 .event_cache_store(event_cache_store.clone()),
2481 )
2482 })
2483 .build()
2484 .await;
2485
2486 let event_cache = client.event_cache();
2487
2488 event_cache.subscribe().unwrap();
2490
2491 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2492 let room = client.get_room(room_id).unwrap();
2493
2494 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2495
2496 let (items, mut stream) = room_event_cache.subscribe().await;
2497 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2498
2499 {
2501 assert!(room_event_cache.find_event(event_id1).await.is_some());
2502 assert!(room_event_cache.find_event(event_id2).await.is_some());
2503 }
2504
2505 {
2507 assert_eq!(items.len(), 1);
2509 assert_eq!(items[0].event_id().unwrap(), event_id2);
2510
2511 assert!(stream.is_empty());
2512 }
2513
2514 {
2516 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2517
2518 assert_let_timeout!(
2519 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2520 );
2521 assert_eq!(diffs.len(), 1);
2522 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2523 assert_eq!(event.event_id().unwrap(), event_id1);
2525 });
2526
2527 assert!(stream.is_empty());
2528 }
2529
2530 room_event_cache.clear().await.unwrap();
2532
2533 assert_let_timeout!(
2535 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2536 );
2537 assert_eq!(diffs.len(), 1);
2538 assert_let!(VectorDiff::Clear = &diffs[0]);
2539
2540 assert_let_timeout!(
2542 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2543 );
2544 assert_eq!(received_room_id, room_id);
2545
2546 assert!(room_event_cache.find_event(event_id1).await.is_some());
2549
2550 let items = room_event_cache.events().await;
2552 assert!(items.is_empty());
2553
2554 let linked_chunk = from_all_chunks::<3, _, _>(
2556 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2557 )
2558 .unwrap()
2559 .unwrap();
2560
2561 assert_eq!(linked_chunk.num_items(), 0);
2565 }
2566
2567 #[async_test]
2568 async fn test_load_from_storage() {
2569 let room_id = room_id!("!galette:saucisse.bzh");
2570 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2571
2572 let event_cache_store = Arc::new(MemoryStore::new());
2573
2574 let event_id1 = event_id!("$1");
2575 let event_id2 = event_id!("$2");
2576
2577 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2578 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2579
2580 event_cache_store
2582 .handle_linked_chunk_updates(
2583 LinkedChunkId::Room(room_id),
2584 vec![
2585 Update::NewItemsChunk {
2587 previous: None,
2588 new: ChunkIdentifier::new(0),
2589 next: None,
2590 },
2591 Update::NewGapChunk {
2593 previous: Some(ChunkIdentifier::new(0)),
2594 new: ChunkIdentifier::new(42),
2596 next: None,
2597 gap: Gap { prev_token: "cheddar".to_owned() },
2598 },
2599 Update::NewItemsChunk {
2601 previous: Some(ChunkIdentifier::new(42)),
2602 new: ChunkIdentifier::new(1),
2603 next: None,
2604 },
2605 Update::PushItems {
2606 at: Position::new(ChunkIdentifier::new(1), 0),
2607 items: vec![ev1.clone()],
2608 },
2609 Update::NewItemsChunk {
2611 previous: Some(ChunkIdentifier::new(1)),
2612 new: ChunkIdentifier::new(2),
2613 next: None,
2614 },
2615 Update::PushItems {
2616 at: Position::new(ChunkIdentifier::new(2), 0),
2617 items: vec![ev2.clone()],
2618 },
2619 ],
2620 )
2621 .await
2622 .unwrap();
2623
2624 let client = MockClientBuilder::new(None)
2625 .on_builder(|builder| {
2626 builder.store_config(
2627 StoreConfig::new("hodlor".to_owned())
2628 .event_cache_store(event_cache_store.clone()),
2629 )
2630 })
2631 .build()
2632 .await;
2633
2634 let event_cache = client.event_cache();
2635
2636 event_cache.subscribe().unwrap();
2638
2639 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2641
2642 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2643 let room = client.get_room(room_id).unwrap();
2644
2645 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2646
2647 assert_matches!(
2650 generic_stream.recv().await,
2651 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2652 assert_eq!(room_id, expected_room_id);
2653 }
2654 );
2655
2656 let (items, mut stream) = room_event_cache.subscribe().await;
2657
2658 assert_eq!(items.len(), 1);
2661 assert_eq!(items[0].event_id().unwrap(), event_id2);
2662 assert!(stream.is_empty());
2663
2664 assert!(room_event_cache.find_event(event_id1).await.is_some());
2666 assert!(room_event_cache.find_event(event_id2).await.is_some());
2667
2668 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2670
2671 assert_let_timeout!(
2672 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2673 );
2674 assert_eq!(diffs.len(), 1);
2675 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2676 assert_eq!(event.event_id().unwrap(), event_id1);
2677 });
2678
2679 assert!(stream.is_empty());
2680
2681 assert_matches!(
2683 generic_stream.recv().await,
2684 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2685 assert_eq!(expected_room_id, room_id);
2686 }
2687 );
2688
2689 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2691
2692 room_event_cache
2693 .inner
2694 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2695 .await
2696 .unwrap();
2697
2698 assert!(generic_stream.recv().now_or_never().is_none());
2701
2702 let items = room_event_cache.events().await;
2707 assert_eq!(items.len(), 2);
2708 assert_eq!(items[0].event_id().unwrap(), event_id1);
2709 assert_eq!(items[1].event_id().unwrap(), event_id2);
2710 }
2711
2712 #[async_test]
2713 async fn test_load_from_storage_resilient_to_failure() {
2714 let room_id = room_id!("!fondue:patate.ch");
2715 let event_cache_store = Arc::new(MemoryStore::new());
2716
2717 let event = EventFactory::new()
2718 .room(room_id)
2719 .sender(user_id!("@ben:saucisse.bzh"))
2720 .text_msg("foo")
2721 .event_id(event_id!("$42"))
2722 .into_event();
2723
2724 event_cache_store
2726 .handle_linked_chunk_updates(
2727 LinkedChunkId::Room(room_id),
2728 vec![
2729 Update::NewItemsChunk {
2730 previous: None,
2731 new: ChunkIdentifier::new(0),
2732 next: None,
2733 },
2734 Update::PushItems {
2735 at: Position::new(ChunkIdentifier::new(0), 0),
2736 items: vec![event],
2737 },
2738 Update::NewItemsChunk {
2739 previous: Some(ChunkIdentifier::new(0)),
2740 new: ChunkIdentifier::new(1),
2741 next: Some(ChunkIdentifier::new(0)),
2742 },
2743 ],
2744 )
2745 .await
2746 .unwrap();
2747
2748 let client = MockClientBuilder::new(None)
2749 .on_builder(|builder| {
2750 builder.store_config(
2751 StoreConfig::new("holder".to_owned())
2752 .event_cache_store(event_cache_store.clone()),
2753 )
2754 })
2755 .build()
2756 .await;
2757
2758 let event_cache = client.event_cache();
2759
2760 event_cache.subscribe().unwrap();
2762
2763 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2764 let room = client.get_room(room_id).unwrap();
2765
2766 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2767
2768 let items = room_event_cache.events().await;
2769
2770 assert!(items.is_empty());
2773
2774 let raw_chunks =
2777 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
2778 assert!(raw_chunks.is_empty());
2779 }
2780
2781 #[async_test]
2782 async fn test_no_useless_gaps() {
2783 let room_id = room_id!("!galette:saucisse.bzh");
2784
2785 let client = MockClientBuilder::new(None).build().await;
2786
2787 let event_cache = client.event_cache();
2788 event_cache.subscribe().unwrap();
2789
2790 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2791 let room = client.get_room(room_id).unwrap();
2792 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2793 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2794
2795 let f = EventFactory::new().room(room_id).sender(*ALICE);
2796
2797 room_event_cache
2800 .inner
2801 .handle_joined_room_update(JoinedRoomUpdate {
2802 timeline: Timeline {
2803 limited: true,
2804 prev_batch: Some("raclette".to_owned()),
2805 events: vec![f.text_msg("hey yo").into_event()],
2806 },
2807 ..Default::default()
2808 })
2809 .await
2810 .unwrap();
2811
2812 assert_matches!(
2814 generic_stream.recv().await,
2815 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2816 assert_eq!(expected_room_id, room_id);
2817 }
2818 );
2819
2820 {
2821 let mut state = room_event_cache.inner.state.write().await;
2822
2823 let mut num_gaps = 0;
2824 let mut num_events = 0;
2825
2826 for c in state.room_linked_chunk().chunks() {
2827 match c.content() {
2828 ChunkContent::Items(items) => num_events += items.len(),
2829 ChunkContent::Gap(_) => num_gaps += 1,
2830 }
2831 }
2832
2833 assert_eq!(num_gaps, 0);
2836 assert_eq!(num_events, 1);
2837
2838 assert_matches!(
2840 state.load_more_events_backwards().await.unwrap(),
2841 LoadMoreEventsBackwardsOutcome::Gap { .. }
2842 );
2843
2844 num_gaps = 0;
2845 num_events = 0;
2846 for c in state.room_linked_chunk().chunks() {
2847 match c.content() {
2848 ChunkContent::Items(items) => num_events += items.len(),
2849 ChunkContent::Gap(_) => num_gaps += 1,
2850 }
2851 }
2852
2853 assert_eq!(num_gaps, 1);
2855 assert_eq!(num_events, 1);
2856 }
2857
2858 room_event_cache
2861 .inner
2862 .handle_joined_room_update(JoinedRoomUpdate {
2863 timeline: Timeline {
2864 limited: false,
2865 prev_batch: Some("fondue".to_owned()),
2866 events: vec![f.text_msg("sup").into_event()],
2867 },
2868 ..Default::default()
2869 })
2870 .await
2871 .unwrap();
2872
2873 assert_matches!(
2875 generic_stream.recv().await,
2876 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2877 assert_eq!(expected_room_id, room_id);
2878 }
2879 );
2880
2881 {
2882 let state = room_event_cache.inner.state.read().await;
2883
2884 let mut num_gaps = 0;
2885 let mut num_events = 0;
2886
2887 for c in state.room_linked_chunk().chunks() {
2888 match c.content() {
2889 ChunkContent::Items(items) => num_events += items.len(),
2890 ChunkContent::Gap(gap) => {
2891 assert_eq!(gap.prev_token, "raclette");
2892 num_gaps += 1;
2893 }
2894 }
2895 }
2896
2897 assert_eq!(num_gaps, 1);
2899 assert_eq!(num_events, 2);
2900 }
2901 }
2902
2903 #[async_test]
2904 async fn test_shrink_to_last_chunk() {
2905 let room_id = room_id!("!galette:saucisse.bzh");
2906
2907 let client = MockClientBuilder::new(None).build().await;
2908
2909 let f = EventFactory::new().room(room_id);
2910
2911 let evid1 = event_id!("$1");
2912 let evid2 = event_id!("$2");
2913
2914 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2915 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2916
2917 {
2919 let store = client.event_cache_store();
2920 let store = store.lock().await.unwrap();
2921 store
2922 .handle_linked_chunk_updates(
2923 LinkedChunkId::Room(room_id),
2924 vec![
2925 Update::NewItemsChunk {
2926 previous: None,
2927 new: ChunkIdentifier::new(0),
2928 next: None,
2929 },
2930 Update::PushItems {
2931 at: Position::new(ChunkIdentifier::new(0), 0),
2932 items: vec![ev1],
2933 },
2934 Update::NewItemsChunk {
2935 previous: Some(ChunkIdentifier::new(0)),
2936 new: ChunkIdentifier::new(1),
2937 next: None,
2938 },
2939 Update::PushItems {
2940 at: Position::new(ChunkIdentifier::new(1), 0),
2941 items: vec![ev2],
2942 },
2943 ],
2944 )
2945 .await
2946 .unwrap();
2947 }
2948
2949 let event_cache = client.event_cache();
2950 event_cache.subscribe().unwrap();
2951
2952 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2953 let room = client.get_room(room_id).unwrap();
2954 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2955
2956 let (events, mut stream) = room_event_cache.subscribe().await;
2958 assert_eq!(events.len(), 1);
2959 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2960 assert!(stream.is_empty());
2961
2962 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2963
2964 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2966 assert_eq!(outcome.events.len(), 1);
2967 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2968 assert!(outcome.reached_start);
2969
2970 assert_let_timeout!(
2972 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2973 );
2974 assert_eq!(diffs.len(), 1);
2975 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2976 assert_eq!(value.event_id().as_deref(), Some(evid1));
2977 });
2978
2979 assert!(stream.is_empty());
2980
2981 assert_let_timeout!(
2983 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2984 );
2985 assert_eq!(received_room_id, room_id);
2986
2987 let diffs = room_event_cache
2989 .inner
2990 .state
2991 .write()
2992 .await
2993 .force_shrink_to_last_chunk()
2994 .await
2995 .expect("shrinking should succeed");
2996
2997 assert_eq!(diffs.len(), 2);
2999 assert_matches!(&diffs[0], VectorDiff::Clear);
3000 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3001 assert_eq!(values.len(), 1);
3002 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3003 });
3004
3005 assert!(stream.is_empty());
3006
3007 assert!(generic_stream.is_empty());
3009
3010 let events = room_event_cache.events().await;
3012 assert_eq!(events.len(), 1);
3013 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3014
3015 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3018 assert_eq!(outcome.events.len(), 1);
3019 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3020 assert!(outcome.reached_start);
3021 }
3022
3023 #[async_test]
3024 async fn test_room_ordering() {
3025 let room_id = room_id!("!galette:saucisse.bzh");
3026
3027 let client = MockClientBuilder::new(None).build().await;
3028
3029 let f = EventFactory::new().room(room_id).sender(*ALICE);
3030
3031 let evid1 = event_id!("$1");
3032 let evid2 = event_id!("$2");
3033 let evid3 = event_id!("$3");
3034
3035 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3036 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3037 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3038
3039 {
3041 let store = client.event_cache_store();
3042 let store = store.lock().await.unwrap();
3043 store
3044 .handle_linked_chunk_updates(
3045 LinkedChunkId::Room(room_id),
3046 vec![
3047 Update::NewItemsChunk {
3048 previous: None,
3049 new: ChunkIdentifier::new(0),
3050 next: None,
3051 },
3052 Update::PushItems {
3053 at: Position::new(ChunkIdentifier::new(0), 0),
3054 items: vec![ev1, ev2],
3055 },
3056 Update::NewItemsChunk {
3057 previous: Some(ChunkIdentifier::new(0)),
3058 new: ChunkIdentifier::new(1),
3059 next: None,
3060 },
3061 Update::PushItems {
3062 at: Position::new(ChunkIdentifier::new(1), 0),
3063 items: vec![ev3.clone()],
3064 },
3065 ],
3066 )
3067 .await
3068 .unwrap();
3069 }
3070
3071 let event_cache = client.event_cache();
3072 event_cache.subscribe().unwrap();
3073
3074 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3075 let room = client.get_room(room_id).unwrap();
3076 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3077
3078 {
3081 let state = room_event_cache.inner.state.read().await;
3082
3083 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3085
3086 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3088
3089 let mut events = state.room_linked_chunk().events();
3091 let (pos, ev) = events.next().unwrap();
3092 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3093 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3094 assert_eq!(state.room_event_order(pos), Some(2));
3095
3096 assert!(events.next().is_none());
3098 }
3099
3100 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3102 assert!(outcome.reached_start);
3103
3104 {
3107 let state = room_event_cache.inner.state.read().await;
3108 for (i, (pos, _)) in state.room_linked_chunk().events().enumerate() {
3109 assert_eq!(state.room_event_order(pos), Some(i));
3110 }
3111 }
3112
3113 let evid4 = event_id!("$4");
3118 room_event_cache
3119 .inner
3120 .handle_joined_room_update(JoinedRoomUpdate {
3121 timeline: Timeline {
3122 limited: true,
3123 prev_batch: Some("fondue".to_owned()),
3124 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3125 },
3126 ..Default::default()
3127 })
3128 .await
3129 .unwrap();
3130
3131 {
3132 let state = room_event_cache.inner.state.read().await;
3133
3134 let mut events = state.room_linked_chunk().events();
3136
3137 let (pos, ev) = events.next().unwrap();
3138 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3139 assert_eq!(state.room_event_order(pos), Some(2));
3140
3141 let (pos, ev) = events.next().unwrap();
3142 assert_eq!(ev.event_id().as_deref(), Some(evid4));
3143 assert_eq!(state.room_event_order(pos), Some(3));
3144
3145 assert!(events.next().is_none());
3147
3148 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3150 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3151
3152 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(1), 0)), None);
3155 }
3156 }
3157
3158 #[async_test]
3159 async fn test_auto_shrink_after_all_subscribers_are_gone() {
3160 let room_id = room_id!("!galette:saucisse.bzh");
3161
3162 let client = MockClientBuilder::new(None).build().await;
3163
3164 let f = EventFactory::new().room(room_id);
3165
3166 let evid1 = event_id!("$1");
3167 let evid2 = event_id!("$2");
3168
3169 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3170 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3171
3172 {
3174 let store = client.event_cache_store();
3175 let store = store.lock().await.unwrap();
3176 store
3177 .handle_linked_chunk_updates(
3178 LinkedChunkId::Room(room_id),
3179 vec![
3180 Update::NewItemsChunk {
3181 previous: None,
3182 new: ChunkIdentifier::new(0),
3183 next: None,
3184 },
3185 Update::PushItems {
3186 at: Position::new(ChunkIdentifier::new(0), 0),
3187 items: vec![ev1],
3188 },
3189 Update::NewItemsChunk {
3190 previous: Some(ChunkIdentifier::new(0)),
3191 new: ChunkIdentifier::new(1),
3192 next: None,
3193 },
3194 Update::PushItems {
3195 at: Position::new(ChunkIdentifier::new(1), 0),
3196 items: vec![ev2],
3197 },
3198 ],
3199 )
3200 .await
3201 .unwrap();
3202 }
3203
3204 let event_cache = client.event_cache();
3205 event_cache.subscribe().unwrap();
3206
3207 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3208 let room = client.get_room(room_id).unwrap();
3209 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3210
3211 let (events1, mut stream1) = room_event_cache.subscribe().await;
3213 assert_eq!(events1.len(), 1);
3214 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3215 assert!(stream1.is_empty());
3216
3217 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3219 assert_eq!(outcome.events.len(), 1);
3220 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3221 assert!(outcome.reached_start);
3222
3223 assert_let_timeout!(
3226 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3227 );
3228 assert_eq!(diffs.len(), 1);
3229 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3230 assert_eq!(value.event_id().as_deref(), Some(evid1));
3231 });
3232
3233 assert!(stream1.is_empty());
3234
3235 let (events2, stream2) = room_event_cache.subscribe().await;
3239 assert_eq!(events2.len(), 2);
3240 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3241 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3242 assert!(stream2.is_empty());
3243
3244 drop(stream1);
3246 yield_now().await;
3247
3248 assert!(stream2.is_empty());
3250
3251 drop(stream2);
3253 yield_now().await;
3254
3255 {
3258 let state = room_event_cache.inner.state.read().await;
3260 assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0);
3261 }
3262
3263 let events3 = room_event_cache.events().await;
3265 assert_eq!(events3.len(), 1);
3266 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3267 }
3268
3269 #[async_test]
3270 async fn test_rfind_map_event_in_memory_by() {
3271 let user_id = user_id!("@mnt_io:matrix.org");
3272 let room_id = room_id!("!raclette:patate.ch");
3273 let client = MockClientBuilder::new(None).build().await;
3274
3275 let event_factory = EventFactory::new().room(room_id);
3276
3277 let event_id_0 = event_id!("$ev0");
3278 let event_id_1 = event_id!("$ev1");
3279 let event_id_2 = event_id!("$ev2");
3280 let event_id_3 = event_id!("$ev3");
3281
3282 let event_0 =
3283 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3284 let event_1 =
3285 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3286 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3287 let event_3 =
3288 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3289
3290 {
3293 let store = client.event_cache_store();
3294 let store = store.lock().await.unwrap();
3295 store
3296 .handle_linked_chunk_updates(
3297 LinkedChunkId::Room(room_id),
3298 vec![
3299 Update::NewItemsChunk {
3300 previous: None,
3301 new: ChunkIdentifier::new(0),
3302 next: None,
3303 },
3304 Update::PushItems {
3305 at: Position::new(ChunkIdentifier::new(0), 0),
3306 items: vec![event_3],
3307 },
3308 Update::NewItemsChunk {
3309 previous: Some(ChunkIdentifier::new(0)),
3310 new: ChunkIdentifier::new(1),
3311 next: None,
3312 },
3313 Update::PushItems {
3314 at: Position::new(ChunkIdentifier::new(1), 0),
3315 items: vec![event_0, event_1, event_2],
3316 },
3317 ],
3318 )
3319 .await
3320 .unwrap();
3321 }
3322
3323 let event_cache = client.event_cache();
3324 event_cache.subscribe().unwrap();
3325
3326 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3327 let room = client.get_room(room_id).unwrap();
3328 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3329
3330 assert_matches!(
3332 room_event_cache
3333 .rfind_map_event_in_memory_by(|event| {
3334 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
3335 })
3336 .await,
3337 Some(event_id) => {
3338 assert_eq!(event_id.as_deref(), Some(event_id_0));
3339 }
3340 );
3341
3342 assert_matches!(
3345 room_event_cache
3346 .rfind_map_event_in_memory_by(|event| {
3347 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
3348 })
3349 .await,
3350 Some(event_id) => {
3351 assert_eq!(event_id.as_deref(), Some(event_id_2));
3352 }
3353 );
3354
3355 assert!(
3357 room_event_cache
3358 .rfind_map_event_in_memory_by(|event| {
3359 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3360 == Some(user_id))
3361 .then(|| event.event_id())
3362 })
3363 .await
3364 .is_none()
3365 );
3366
3367 assert!(room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.is_none());
3369 }
3370}