1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::AmbiguityChange,
32 event_cache::Event,
33 linked_chunk::Position,
34 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37 api::Direction,
38 events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
39 serde::Raw,
40 EventId, OwnedEventId, OwnedRoomId,
41};
42use tokio::sync::{
43 broadcast::{Receiver, Sender},
44 mpsc, Notify, RwLock,
45};
46use tracing::{instrument, trace, warn};
47
48use super::{
49 AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
50 RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
51};
52use crate::{
53 client::WeakClient,
54 event_cache::EventCacheError,
55 room::{IncludeRelations, RelationsOptions, WeakRoom},
56};
57
58pub(super) mod events;
59mod threads;
60
61pub use threads::ThreadEventCacheUpdate;
62
63#[derive(Clone)]
67pub struct RoomEventCache {
68 pub(super) inner: Arc<RoomEventCacheInner>,
69}
70
71impl fmt::Debug for RoomEventCache {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("RoomEventCache").finish_non_exhaustive()
74 }
75}
76
77#[allow(missing_debug_implementations)]
87pub struct RoomEventCacheSubscriber {
88 recv: Receiver<RoomEventCacheUpdate>,
90
91 room_id: OwnedRoomId,
93
94 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
96
97 subscriber_count: Arc<AtomicUsize>,
99}
100
101impl Drop for RoomEventCacheSubscriber {
102 fn drop(&mut self) {
103 let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
104
105 trace!(
106 "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
107 );
108
109 if previous_subscriber_count == 1 {
110 let mut room_id = self.room_id.clone();
114
115 let mut num_attempts = 0;
121
122 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
123 num_attempts += 1;
124
125 if num_attempts > 1024 {
126 warn!(
129 "couldn't send notification to the auto-shrink channel \
130 after 1024 attempts; giving up"
131 );
132 return;
133 }
134
135 match err {
136 mpsc::error::TrySendError::Full(stolen_room_id) => {
137 room_id = stolen_room_id;
138 }
139 mpsc::error::TrySendError::Closed(_) => return,
140 }
141 }
142
143 trace!("sent notification to the parent channel that we were the last subscriber");
144 }
145 }
146}
147
148impl Deref for RoomEventCacheSubscriber {
149 type Target = Receiver<RoomEventCacheUpdate>;
150
151 fn deref(&self) -> &Self::Target {
152 &self.recv
153 }
154}
155
156impl DerefMut for RoomEventCacheSubscriber {
157 fn deref_mut(&mut self) -> &mut Self::Target {
158 &mut self.recv
159 }
160}
161
162impl RoomEventCache {
163 pub(super) fn new(
165 client: WeakClient,
166 state: RoomEventCacheState,
167 pagination_status: SharedObservable<RoomPaginationStatus>,
168 room_id: OwnedRoomId,
169 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
170 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
171 ) -> Self {
172 Self {
173 inner: Arc::new(RoomEventCacheInner::new(
174 client,
175 state,
176 pagination_status,
177 room_id,
178 auto_shrink_sender,
179 generic_update_sender,
180 )),
181 }
182 }
183
184 pub async fn events(&self) -> Vec<Event> {
189 let state = self.inner.state.read().await;
190
191 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect()
192 }
193
194 pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheSubscriber) {
201 let state = self.inner.state.read().await;
202 let events =
203 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
204
205 let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst);
206 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
207
208 let recv = self.inner.sender.subscribe();
209 let subscriber = RoomEventCacheSubscriber {
210 recv,
211 room_id: self.inner.room_id.clone(),
212 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
213 subscriber_count: state.subscriber_count.clone(),
214 };
215
216 (events, subscriber)
217 }
218
219 pub async fn subscribe_to_thread(
222 &self,
223 thread_root: OwnedEventId,
224 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
225 let mut state = self.inner.state.write().await;
226 state.subscribe_to_thread(thread_root)
227 }
228
229 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234 pub async fn paginate_thread_backwards(
235 &self,
236 thread_root: OwnedEventId,
237 num_events: u16,
238 ) -> Result<bool> {
239 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
240
241 let mut outcome =
243 self.inner.state.write().await.load_more_thread_events_backwards(thread_root.clone());
244
245 loop {
246 match outcome {
247 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
248 let options = RelationsOptions {
250 from: prev_token.clone(),
251 dir: Direction::Backward,
252 limit: Some(num_events.into()),
253 include_relations: IncludeRelations::AllRelations,
254 recurse: true,
255 };
256
257 let mut result = room
258 .relations(thread_root.clone(), options)
259 .await
260 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
261
262 let reached_start = result.next_batch_token.is_none();
263 trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
264
265 if reached_start {
268 let root_event = room
270 .load_or_fetch_event(&thread_root, None)
271 .await
272 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
273
274 result.chunk.push(root_event);
277 }
278
279 let mut state = self.inner.state.write().await;
280
281 if let Some(outcome) = state.finish_thread_network_pagination(
282 thread_root.clone(),
283 prev_token,
284 result.next_batch_token,
285 result.chunk,
286 ) {
287 return Ok(outcome.reached_start);
288 }
289
290 outcome = state.load_more_thread_events_backwards(thread_root.clone());
292 }
293
294 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
295 return Ok(true);
297 }
298
299 LoadMoreEventsBackwardsOutcome::Events { .. } => {
300 unimplemented!("loading from disk for threads is not implemented yet");
302 }
303
304 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
305 unreachable!("unused for threads")
306 }
307 }
308 }
309 }
310
311 pub fn pagination(&self) -> RoomPagination {
314 RoomPagination { inner: self.inner.clone() }
315 }
316
317 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Option<O>
323 where
324 P: FnMut(&Event) -> Option<O>,
325 {
326 self.inner.state.read().await.rfind_map_event_in_memory_by(predicate)
327 }
328
329 pub async fn find_event(&self, event_id: &EventId) -> Option<Event> {
334 self.inner
335 .state
336 .read()
337 .await
338 .find_event(event_id)
339 .await
340 .ok()
341 .flatten()
342 .map(|(_loc, event)| event)
343 }
344
345 pub async fn find_event_with_relations(
357 &self,
358 event_id: &EventId,
359 filter: Option<Vec<RelationType>>,
360 ) -> Option<(Event, Vec<Event>)> {
361 self.inner
363 .state
364 .read()
365 .await
366 .find_event_with_relations(event_id, filter.clone())
367 .await
368 .ok()
369 .flatten()
370 }
371
372 pub async fn clear(&self) -> Result<()> {
377 let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
379
380 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
382 diffs: updates_as_vector_diffs,
383 origin: EventsOrigin::Cache,
384 });
385
386 let _ = self
388 .inner
389 .generic_update_sender
390 .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
391
392 Ok(())
393 }
394
395 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
398 if let Err(err) = self.inner.state.write().await.save_event(events).await {
399 warn!("couldn't save event in the event cache: {err}");
400 }
401 }
402
403 pub async fn debug_string(&self) -> Vec<String> {
406 self.inner.state.read().await.room_linked_chunk().debug_string()
407 }
408}
409
410pub(super) struct RoomEventCacheInner {
412 pub(super) room_id: OwnedRoomId,
414
415 pub weak_room: WeakRoom,
416
417 pub sender: Sender<RoomEventCacheUpdate>,
419
420 pub state: RwLock<RoomEventCacheState>,
422
423 pub pagination_batch_token_notifier: Notify,
425
426 pub pagination_status: SharedObservable<RoomPaginationStatus>,
427
428 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
433
434 pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
440}
441
442impl RoomEventCacheInner {
443 fn new(
446 client: WeakClient,
447 state: RoomEventCacheState,
448 pagination_status: SharedObservable<RoomPaginationStatus>,
449 room_id: OwnedRoomId,
450 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
451 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
452 ) -> Self {
453 let sender = Sender::new(32);
454 let weak_room = WeakRoom::new(client, room_id);
455 Self {
456 room_id: weak_room.room_id().to_owned(),
457 weak_room,
458 state: RwLock::new(state),
459 sender,
460 pagination_batch_token_notifier: Default::default(),
461 auto_shrink_sender,
462 pagination_status,
463 generic_update_sender,
464 }
465 }
466
467 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
468 if account_data.is_empty() {
469 return;
470 }
471
472 let mut handled_read_marker = false;
473
474 trace!("Handling account data");
475
476 for raw_event in account_data {
477 match raw_event.deserialize() {
478 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
479 if handled_read_marker {
482 continue;
483 }
484
485 handled_read_marker = true;
486
487 let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
489 event_id: ev.content.event_id,
490 });
491 }
492
493 Ok(_) => {
494 }
497
498 Err(e) => {
499 let event_type = raw_event.get_field::<String>("type").ok().flatten();
500 warn!(event_type, "Failed to deserialize account data: {e}");
501 }
502 }
503 }
504 }
505
506 #[instrument(skip_all, fields(room_id = %self.room_id))]
507 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
508 self.handle_timeline(
509 updates.timeline,
510 updates.ephemeral.clone(),
511 updates.ambiguity_changes,
512 )
513 .await?;
514 self.handle_account_data(updates.account_data);
515
516 Ok(())
517 }
518
519 #[instrument(skip_all, fields(room_id = %self.room_id))]
520 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
521 self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
522
523 Ok(())
524 }
525
526 async fn handle_timeline(
529 &self,
530 timeline: Timeline,
531 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
532 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
533 ) -> Result<()> {
534 if timeline.events.is_empty()
535 && timeline.prev_batch.is_none()
536 && ephemeral_events.is_empty()
537 && ambiguity_changes.is_empty()
538 {
539 return Ok(());
540 }
541
542 trace!("adding new events");
544
545 let (stored_prev_batch_token, timeline_event_diffs) =
546 self.state.write().await.handle_sync(timeline).await?;
547
548 if stored_prev_batch_token {
551 self.pagination_batch_token_notifier.notify_one();
552 }
553
554 if !timeline_event_diffs.is_empty() {
557 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
558 diffs: timeline_event_diffs,
559 origin: EventsOrigin::Sync,
560 });
561
562 let _ = self
563 .generic_update_sender
564 .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
565 }
566
567 if !ephemeral_events.is_empty() {
568 let _ = self
569 .sender
570 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
571 }
572
573 if !ambiguity_changes.is_empty() {
574 let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
575 }
576
577 Ok(())
578 }
579}
580
581#[derive(Debug)]
584pub(super) enum LoadMoreEventsBackwardsOutcome {
585 Gap {
587 prev_token: Option<String>,
590 },
591
592 StartOfTimeline,
594
595 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
597
598 WaitForInitialPrevToken,
600}
601
602mod private {
604 use std::{
605 collections::{BTreeMap, HashMap, HashSet},
606 sync::{atomic::AtomicUsize, Arc},
607 };
608
609 use eyeball::SharedObservable;
610 use eyeball_im::VectorDiff;
611 use matrix_sdk_base::{
612 apply_redaction,
613 deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
614 event_cache::{
615 store::{DynEventCacheStore, EventCacheStoreLock},
616 Event, Gap,
617 },
618 linked_chunk::{
619 lazy_loader::{self},
620 ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
621 OwnedLinkedChunkId, Position, Update,
622 },
623 serde_helpers::extract_thread_root,
624 sync::Timeline,
625 };
626 use matrix_sdk_common::executor::spawn;
627 use ruma::{
628 events::{
629 relation::RelationType, room::redaction::SyncRoomRedactionEvent,
630 AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
631 },
632 room_version_rules::RoomVersionRules,
633 serde::Raw,
634 EventId, OwnedEventId, OwnedRoomId,
635 };
636 use tokio::sync::broadcast::{Receiver, Sender};
637 use tracing::{debug, error, instrument, trace, warn};
638
639 use super::{
640 super::{deduplicator::DeduplicationOutcome, EventCacheError},
641 events::EventLinkedChunk,
642 sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
643 };
644 use crate::event_cache::{
645 deduplicator::filter_duplicate_events, room::threads::ThreadEventCache,
646 BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
647 ThreadEventCacheUpdate,
648 };
649
650 pub struct RoomEventCacheState {
655 room: OwnedRoomId,
657
658 room_version_rules: RoomVersionRules,
660
661 enabled_thread_support: bool,
663
664 store: EventCacheStoreLock,
666
667 room_linked_chunk: EventLinkedChunk,
670
671 threads: HashMap<OwnedEventId, ThreadEventCache>,
675
676 pub waited_for_initial_prev_token: bool,
681
682 pagination_status: SharedObservable<RoomPaginationStatus>,
683
684 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
687
688 pub(super) subscriber_count: Arc<AtomicUsize>,
691 }
692
693 impl RoomEventCacheState {
694 pub async fn new(
704 room_id: OwnedRoomId,
705 room_version_rules: RoomVersionRules,
706 enabled_thread_support: bool,
707 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
708 store: EventCacheStoreLock,
709 pagination_status: SharedObservable<RoomPaginationStatus>,
710 ) -> Result<Self, EventCacheError> {
711 let store_lock = store.lock().await?;
712
713 let linked_chunk_id = LinkedChunkId::Room(&room_id);
714
715 let full_linked_chunk_metadata =
720 match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await {
721 Ok(metas) => metas,
722 Err(err) => {
723 error!(
724 "error when loading a linked chunk's metadata from the store: {err}"
725 );
726
727 store_lock
729 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
730 .await?;
731
732 None
734 }
735 };
736
737 let linked_chunk = match store_lock
738 .load_last_chunk(linked_chunk_id)
739 .await
740 .map_err(EventCacheError::from)
741 .and_then(|(last_chunk, chunk_identifier_generator)| {
742 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
743 .map_err(EventCacheError::from)
744 }) {
745 Ok(linked_chunk) => linked_chunk,
746 Err(err) => {
747 error!(
748 "error when loading a linked chunk's latest chunk from the store: {err}"
749 );
750
751 store_lock
753 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
754 .await?;
755
756 None
757 }
758 };
759
760 let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk(
761 linked_chunk,
762 full_linked_chunk_metadata,
763 );
764
765 let threads = HashMap::new();
770
771 Ok(Self {
772 room: room_id,
773 room_version_rules,
774 enabled_thread_support,
775 store,
776 room_linked_chunk,
777 threads,
778 waited_for_initial_prev_token: false,
779 subscriber_count: Default::default(),
780 pagination_status,
781 linked_chunk_update_sender,
782 })
783 }
784
785 async fn load_linked_chunk_metadata(
791 store: &DynEventCacheStore,
792 linked_chunk_id: LinkedChunkId<'_>,
793 ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
794 let mut all_chunks = store
795 .load_all_chunks_metadata(linked_chunk_id)
796 .await
797 .map_err(EventCacheError::from)?;
798
799 if all_chunks.is_empty() {
800 return Ok(None);
802 }
803
804 let chunk_map: HashMap<_, _> =
806 all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
807
808 let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
810 let Some(last) = iter.next() else {
811 return Err(EventCacheError::InvalidLinkedChunkMetadata {
812 details: "no last chunk found".to_owned(),
813 });
814 };
815
816 if let Some(other_last) = iter.next() {
818 return Err(EventCacheError::InvalidLinkedChunkMetadata {
819 details: format!(
820 "chunks {} and {} both claim to be last chunks",
821 last.identifier.index(),
822 other_last.identifier.index()
823 ),
824 });
825 }
826
827 let mut seen = HashSet::new();
830 let mut current = last;
831 loop {
832 if !seen.insert(current.identifier) {
834 return Err(EventCacheError::InvalidLinkedChunkMetadata {
835 details: format!(
836 "cycle detected in linked chunk at {}",
837 current.identifier.index()
838 ),
839 });
840 }
841
842 let Some(prev_id) = current.previous else {
843 if seen.len() != all_chunks.len() {
845 return Err(EventCacheError::InvalidLinkedChunkMetadata {
846 details: format!(
847 "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
848 seen.len(),
849 all_chunks.len()
850 ),
851 });
852 }
853 break;
854 };
855
856 let Some(pred_meta) = chunk_map.get(&prev_id) else {
859 return Err(EventCacheError::InvalidLinkedChunkMetadata {
860 details: format!(
861 "missing predecessor {} chunk for {}",
862 prev_id.index(),
863 current.identifier.index()
864 ),
865 });
866 };
867
868 if pred_meta.next != Some(current.identifier) {
870 return Err(EventCacheError::InvalidLinkedChunkMetadata {
871 details: format!(
872 "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
873 pred_meta.identifier.index(),
874 pred_meta.next.map(|chunk_id| chunk_id.index()),
875 current.identifier.index()
876 ),
877 });
878 }
879
880 current = *pred_meta;
881 }
882
883 let mut current = current.identifier;
891 for i in 0..all_chunks.len() {
892 let j = all_chunks
894 .iter()
895 .rev()
896 .position(|meta| meta.identifier == current)
897 .map(|j| all_chunks.len() - 1 - j)
898 .expect("the target chunk must be present in the metadata");
899 if i != j {
900 all_chunks.swap(i, j);
901 }
902 if let Some(next) = all_chunks[i].next {
903 current = next;
904 }
905 }
906
907 Ok(Some(all_chunks))
908 }
909
910 fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
913 if self.room_linked_chunk.events().next().is_some() {
918 trace!("chunk is fully loaded and non-empty: reached_start=true");
921 LoadMoreEventsBackwardsOutcome::StartOfTimeline
922 } else if !self.waited_for_initial_prev_token {
923 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
925 } else {
926 LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
930 }
931 }
932
933 pub(in super::super) async fn load_more_events_backwards(
935 &mut self,
936 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
937 if let Some(prev_token) = self.room_linked_chunk.rgap().map(|gap| gap.prev_token) {
940 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
941 }
942
943 let first_chunk_identifier = self
946 .room_linked_chunk
947 .chunks()
948 .next()
949 .expect("a linked chunk is never empty")
950 .identifier();
951
952 let store = self.store.lock().await?;
953
954 let linked_chunk_id = LinkedChunkId::Room(&self.room);
956 let new_first_chunk = match store
957 .load_previous_chunk(linked_chunk_id, first_chunk_identifier)
958 .await
959 {
960 Ok(Some(new_first_chunk)) => {
961 new_first_chunk
963 }
964
965 Ok(None) => {
966 return Ok(self.conclude_load_more_for_fully_loaded_chunk());
968 }
969
970 Err(err) => {
971 error!("error when loading the previous chunk of a linked chunk: {err}");
972
973 store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
975
976 return Err(err.into());
978 }
979 };
980
981 let chunk_content = new_first_chunk.content.clone();
982
983 let reached_start = new_first_chunk.previous.is_none();
989
990 if let Err(err) = self.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk) {
991 error!("error when inserting the previous chunk into its linked chunk: {err}");
992
993 store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
995
996 return Err(err.into());
998 }
999
1000 let _ = self.room_linked_chunk.store_updates().take();
1003
1004 let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1006
1007 Ok(match chunk_content {
1008 ChunkContent::Gap(gap) => {
1009 trace!("reloaded chunk from disk (gap)");
1010 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1011 }
1012
1013 ChunkContent::Items(events) => {
1014 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1015 LoadMoreEventsBackwardsOutcome::Events {
1016 events,
1017 timeline_event_diffs,
1018 reached_start,
1019 }
1020 }
1021 })
1022 }
1023
1024 pub(super) async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1033 let store_lock = self.store.lock().await?;
1034
1035 let linked_chunk_id = LinkedChunkId::Room(&self.room);
1037 let (last_chunk, chunk_identifier_generator) =
1038 match store_lock.load_last_chunk(linked_chunk_id).await {
1039 Ok(pair) => pair,
1040
1041 Err(err) => {
1042 error!("error when reloading a linked chunk from memory: {err}");
1044
1045 store_lock
1047 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1048 .await?;
1049
1050 (None, ChunkIdentifierGenerator::new_from_scratch())
1052 }
1053 };
1054
1055 debug!("unloading the linked chunk, and resetting it to its last chunk");
1056
1057 if let Err(err) =
1060 self.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1061 {
1062 error!("error when replacing the linked chunk: {err}");
1063 return self.reset_internal().await;
1064 }
1065
1066 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1070
1071 let _ = self.room_linked_chunk.store_updates().take();
1074
1075 Ok(())
1076 }
1077
1078 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1081 pub(crate) async fn auto_shrink_if_no_subscribers(
1082 &mut self,
1083 ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1084 let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst);
1085
1086 trace!(subscriber_count, "received request to auto-shrink");
1087
1088 if subscriber_count == 0 {
1089 self.shrink_to_last_chunk().await?;
1092 Ok(Some(self.room_linked_chunk.updates_as_vector_diffs()))
1093 } else {
1094 Ok(None)
1095 }
1096 }
1097
1098 #[cfg(test)]
1099 pub(crate) async fn force_shrink_to_last_chunk(
1100 &mut self,
1101 ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1102 self.shrink_to_last_chunk().await?;
1103 Ok(self.room_linked_chunk.updates_as_vector_diffs())
1104 }
1105
1106 pub(crate) fn room_event_order(&self, event_pos: Position) -> Option<usize> {
1107 self.room_linked_chunk.event_order(event_pos)
1108 }
1109
1110 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1114 let mut closure = || -> Option<()> {
1118 let mut val: serde_json::Value = event.deserialize_as().ok()?;
1119 let unsigned = val.get_mut("unsigned")?;
1120 let unsigned_obj = unsigned.as_object_mut()?;
1121 if unsigned_obj.remove("m.relations").is_some() {
1122 *event = Raw::new(&val).ok()?.cast_unchecked();
1123 }
1124 None
1125 };
1126 let _ = closure();
1127 }
1128
1129 fn strip_relations_from_event(ev: &mut Event) {
1130 match &mut ev.kind {
1131 TimelineEventKind::Decrypted(decrypted) => {
1132 decrypted.unsigned_encryption_info = None;
1135
1136 Self::strip_relations_if_present(&mut decrypted.event);
1138 }
1139
1140 TimelineEventKind::UnableToDecrypt { event, .. }
1141 | TimelineEventKind::PlainText { event } => {
1142 Self::strip_relations_if_present(event);
1143 }
1144 }
1145 }
1146
1147 fn strip_relations_from_events(items: &mut [Event]) {
1149 for ev in items.iter_mut() {
1150 Self::strip_relations_from_event(ev);
1151 }
1152 }
1153
1154 #[instrument(skip_all)]
1160 async fn remove_events(
1161 &mut self,
1162 in_memory_events: Vec<(OwnedEventId, Position)>,
1163 in_store_events: Vec<(OwnedEventId, Position)>,
1164 ) -> Result<(), EventCacheError> {
1165 if !in_store_events.is_empty() {
1167 let mut positions = in_store_events
1168 .into_iter()
1169 .map(|(_event_id, position)| position)
1170 .collect::<Vec<_>>();
1171
1172 sort_positions_descending(&mut positions);
1173
1174 let updates = positions
1175 .into_iter()
1176 .map(|pos| Update::RemoveItem { at: pos })
1177 .collect::<Vec<_>>();
1178
1179 self.apply_store_only_updates(updates).await?;
1180 }
1181
1182 if in_memory_events.is_empty() {
1184 return Ok(());
1186 }
1187
1188 self.room_linked_chunk
1190 .remove_events_by_position(
1191 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1192 )
1193 .expect("failed to remove an event");
1194
1195 self.propagate_changes().await
1196 }
1197
1198 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1200 let updates = self.room_linked_chunk.store_updates().take();
1201 self.send_updates_to_store(updates).await
1202 }
1203
1204 async fn apply_store_only_updates(
1211 &mut self,
1212 updates: Vec<Update<Event, Gap>>,
1213 ) -> Result<(), EventCacheError> {
1214 self.room_linked_chunk.order_tracker.map_updates(&updates);
1215 self.send_updates_to_store(updates).await
1216 }
1217
1218 async fn send_updates_to_store(
1219 &mut self,
1220 mut updates: Vec<Update<Event, Gap>>,
1221 ) -> Result<(), EventCacheError> {
1222 if updates.is_empty() {
1223 return Ok(());
1224 }
1225
1226 for update in updates.iter_mut() {
1228 match update {
1229 Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1230 Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1231 Update::NewItemsChunk { .. }
1233 | Update::NewGapChunk { .. }
1234 | Update::RemoveChunk(_)
1235 | Update::RemoveItem { .. }
1236 | Update::DetachLastItems { .. }
1237 | Update::StartReattachItems
1238 | Update::EndReattachItems
1239 | Update::Clear => {}
1240 }
1241 }
1242
1243 let store = self.store.clone();
1250 let room_id = self.room.clone();
1251 let cloned_updates = updates.clone();
1252
1253 spawn(async move {
1254 let store = store.lock().await?;
1255
1256 trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1257 let linked_chunk_id = LinkedChunkId::Room(&room_id);
1258 store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1259 trace!("linked chunk updates applied");
1260
1261 super::Result::Ok(())
1262 })
1263 .await
1264 .expect("joining failed")?;
1265
1266 let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1268 linked_chunk_id: OwnedLinkedChunkId::Room(self.room.clone()),
1269 updates,
1270 });
1271
1272 Ok(())
1273 }
1274
1275 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1281 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1282 self.reset_internal().await?;
1283
1284 let diff_updates = self.room_linked_chunk.updates_as_vector_diffs();
1285
1286 debug_assert_eq!(diff_updates.len(), 1);
1288 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1289
1290 Ok(diff_updates)
1291 }
1292
1293 async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1294 self.room_linked_chunk.reset();
1295
1296 for thread in self.threads.values_mut() {
1302 thread.clear();
1303 }
1304
1305 self.propagate_changes().await?;
1306
1307 self.waited_for_initial_prev_token = false;
1311 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1313
1314 Ok(())
1315 }
1316
1317 pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1319 &self.room_linked_chunk
1320 }
1321
1322 pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1328 where
1329 P: FnMut(&Event) -> Option<O>,
1330 {
1331 self.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event))
1332 }
1333
1334 pub async fn find_event(
1339 &self,
1340 event_id: &EventId,
1341 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1342 for (position, event) in self.room_linked_chunk.revents() {
1345 if event.event_id().as_deref() == Some(event_id) {
1346 return Ok(Some((EventLocation::Memory(position), event.clone())));
1347 }
1348 }
1349
1350 let store = self.store.lock().await?;
1351
1352 Ok(store
1353 .find_event(&self.room, event_id)
1354 .await?
1355 .map(|event| (EventLocation::Store, event)))
1356 }
1357
1358 pub async fn find_event_with_relations(
1372 &self,
1373 event_id: &EventId,
1374 filters: Option<Vec<RelationType>>,
1375 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1376 let store = self.store.lock().await?;
1377
1378 let found = store.find_event(&self.room, event_id).await?;
1380
1381 let Some(target) = found else {
1382 return Ok(None);
1384 };
1385
1386 let mut related =
1389 store.find_event_relations(&self.room, event_id, filters.as_deref()).await?;
1390 let mut stack =
1391 related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
1392
1393 let mut already_seen = HashSet::new();
1396 already_seen.insert(event_id.to_owned());
1397
1398 let mut num_iters = 1;
1399
1400 while let Some(event_id) = stack.pop() {
1402 if !already_seen.insert(event_id.clone()) {
1403 continue;
1405 }
1406
1407 let other_related =
1408 store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?;
1409
1410 stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
1411 related.extend(other_related);
1412
1413 num_iters += 1;
1414 }
1415
1416 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
1417
1418 related.sort_by(|(_, lhs), (_, rhs)| {
1422 use std::cmp::Ordering;
1423 match (lhs, rhs) {
1424 (None, None) => Ordering::Equal,
1425 (None, Some(_)) => Ordering::Less,
1426 (Some(_), None) => Ordering::Greater,
1427 (Some(lhs), Some(rhs)) => {
1428 let lhs = self.room_event_order(*lhs);
1429 let rhs = self.room_event_order(*rhs);
1430
1431 match (lhs, rhs) {
1435 (None, None) => Ordering::Equal,
1436 (None, Some(_)) => Ordering::Less,
1437 (Some(_), None) => Ordering::Greater,
1438 (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
1439 }
1440 }
1441 }
1442 });
1443
1444 let related = related.into_iter().map(|(event, _pos)| event).collect();
1446
1447 Ok(Some((target, related)))
1448 }
1449
1450 async fn post_process_new_events(
1455 &mut self,
1456 events: Vec<Event>,
1457 is_sync: bool,
1458 ) -> Result<(), EventCacheError> {
1459 self.propagate_changes().await?;
1461
1462 let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1463
1464 for event in events {
1465 self.maybe_apply_new_redaction(&event).await?;
1466
1467 if self.enabled_thread_support {
1468 if let Some(thread_root) = extract_thread_root(event.raw()) {
1469 new_events_by_thread.entry(thread_root).or_default().push(event.clone());
1470 } else if let Some(event_id) = event.event_id() {
1471 if self.threads.contains_key(&event_id) {
1473 new_events_by_thread.entry(event_id).or_default().push(event.clone());
1474 }
1475 }
1476 }
1477
1478 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1480 self.save_event([*bundled_thread]).await?;
1481 }
1482 }
1483
1484 self.update_threads(new_events_by_thread, is_sync).await?;
1485
1486 Ok(())
1487 }
1488
1489 fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1490 self.threads.entry(root_event_id.clone()).or_insert_with(|| {
1493 ThreadEventCache::new(
1494 self.room.clone(),
1495 root_event_id,
1496 self.linked_chunk_update_sender.clone(),
1497 )
1498 })
1499 }
1500
1501 #[instrument(skip_all)]
1502 async fn update_threads(
1503 &mut self,
1504 new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1505 is_sync: bool,
1506 ) -> Result<(), EventCacheError> {
1507 for (thread_root, new_events) in new_events_by_thread {
1508 let thread_cache = self.get_or_reload_thread(thread_root.clone());
1509
1510 if is_sync {
1515 thread_cache.add_live_events(new_events);
1516 }
1517
1518 let last_event_id = thread_cache.latest_event_id();
1522
1523 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1524 else {
1525 trace!(%thread_root, "thread root event is missing from the linked chunk");
1526 continue;
1527 };
1528
1529 let prev_summary = target_event.thread_summary.summary();
1530 let mut latest_reply =
1531 prev_summary.as_ref().and_then(|summary| summary.latest_reply.clone());
1532
1533 let num_replies = {
1542 let store_guard = &*self.store.lock().await?;
1543 let related_thread_events = store_guard
1544 .find_event_relations(
1545 &self.room,
1546 &thread_root,
1547 Some(&[RelationType::Thread]),
1548 )
1549 .await?;
1550 related_thread_events.len().try_into().unwrap_or(u32::MAX)
1551 };
1552
1553 if let Some(last_event_id) = last_event_id {
1554 latest_reply = Some(last_event_id);
1555 }
1556
1557 let new_summary = ThreadSummary { num_replies, latest_reply };
1558
1559 if prev_summary == Some(&new_summary) {
1560 trace!(%thread_root, "thread summary is already up-to-date");
1561 continue;
1562 }
1563
1564 target_event.thread_summary = ThreadSummaryStatus::Some(new_summary);
1566 self.replace_event_at(location, target_event).await?;
1567 }
1568
1569 Ok(())
1570 }
1571
1572 async fn replace_event_at(
1579 &mut self,
1580 location: EventLocation,
1581 event: Event,
1582 ) -> Result<(), EventCacheError> {
1583 match location {
1584 EventLocation::Memory(position) => {
1585 self.room_linked_chunk
1586 .replace_event_at(position, event)
1587 .expect("should have been a valid position of an item");
1588 self.propagate_changes().await?;
1591 }
1592 EventLocation::Store => {
1593 self.save_event([event]).await?;
1594 }
1595 }
1596
1597 Ok(())
1598 }
1599
1600 #[instrument(skip_all)]
1604 async fn maybe_apply_new_redaction(
1605 &mut self,
1606 event: &Event,
1607 ) -> Result<(), EventCacheError> {
1608 let raw_event = event.raw();
1609
1610 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1613 raw_event.get_field::<MessageLikeEventType>("type")
1614 else {
1615 return Ok(());
1616 };
1617
1618 let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1621 redaction,
1622 ))) = raw_event.deserialize()
1623 else {
1624 return Ok(());
1625 };
1626
1627 let Some(event_id) = redaction.redacts(&self.room_version_rules.redaction) else {
1628 warn!("missing target event id from the redaction event");
1629 return Ok(());
1630 };
1631
1632 let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1634 trace!("redacted event is missing from the linked chunk");
1635 return Ok(());
1636 };
1637
1638 if let Ok(deserialized) = target_event.raw().deserialize() {
1640 match deserialized {
1641 AnySyncTimelineEvent::MessageLike(ev) => {
1642 if ev.is_redacted() {
1643 return Ok(());
1644 }
1645 }
1646 AnySyncTimelineEvent::State(ev) => {
1647 if ev.is_redacted() {
1648 return Ok(());
1649 }
1650 }
1651 }
1652 }
1653
1654 if let Some(redacted_event) = apply_redaction(
1655 target_event.raw(),
1656 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
1657 &self.room_version_rules.redaction,
1658 ) {
1659 target_event.replace_raw(redacted_event.cast_unchecked());
1664
1665 self.replace_event_at(location, target_event).await?;
1666 }
1667
1668 Ok(())
1669 }
1670
1671 pub async fn save_event(
1678 &self,
1679 events: impl IntoIterator<Item = Event>,
1680 ) -> Result<(), EventCacheError> {
1681 let store = self.store.clone();
1682 let room_id = self.room.clone();
1683 let events = events.into_iter().collect::<Vec<_>>();
1684
1685 spawn(async move {
1687 let store = store.lock().await?;
1688 for event in events {
1689 store.save_event(&room_id, event).await?;
1690 }
1691 super::Result::Ok(())
1692 })
1693 .await
1694 .expect("joining failed")?;
1695
1696 Ok(())
1697 }
1698
1699 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1707 pub async fn handle_sync(
1708 &mut self,
1709 mut timeline: Timeline,
1710 ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1711 let mut prev_batch = timeline.prev_batch.take();
1712
1713 let DeduplicationOutcome {
1714 all_events: events,
1715 in_memory_duplicated_event_ids,
1716 in_store_duplicated_event_ids,
1717 non_empty_all_duplicates: all_duplicates,
1718 } = filter_duplicate_events(
1719 &self.store,
1720 LinkedChunkId::Room(self.room.as_ref()),
1721 &self.room_linked_chunk,
1722 timeline.events,
1723 )
1724 .await?;
1725
1726 if !timeline.limited && self.room_linked_chunk.events().next().is_some()
1739 || all_duplicates
1740 {
1741 prev_batch = None;
1742 }
1743
1744 if prev_batch.is_some() {
1745 let mut summaries_to_update = Vec::new();
1750
1751 for (thread_root, thread) in self.threads.iter_mut() {
1752 thread.clear();
1754
1755 summaries_to_update.push(thread_root.clone());
1756 }
1757
1758 for thread_root in summaries_to_update {
1762 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1763 else {
1764 trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1765 continue;
1766 };
1767
1768 if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1769 prev_summary.latest_reply = None;
1770
1771 target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1772
1773 self.replace_event_at(location, target_event).await?;
1774 }
1775 }
1776 }
1777
1778 if all_duplicates {
1779 return Ok((false, Vec::new()));
1782 }
1783
1784 let has_new_gap = prev_batch.is_some();
1785
1786 if !self.waited_for_initial_prev_token && has_new_gap {
1789 self.waited_for_initial_prev_token = true;
1790 }
1791
1792 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1797 .await?;
1798
1799 self.room_linked_chunk
1800 .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1801
1802 self.post_process_new_events(events, true).await?;
1803
1804 if timeline.limited && has_new_gap {
1805 self.shrink_to_last_chunk().await?;
1812 }
1813
1814 let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1815
1816 Ok((has_new_gap, timeline_event_diffs))
1817 }
1818
1819 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1827 pub async fn handle_backpagination(
1828 &mut self,
1829 events: Vec<Event>,
1830 mut new_token: Option<String>,
1831 prev_token: Option<String>,
1832 ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1833 {
1834 let prev_gap_id = if let Some(token) = prev_token {
1837 let gap_chunk_id = self.room_linked_chunk.chunk_identifier(|chunk| {
1839 matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
1840 });
1841
1842 if gap_chunk_id.is_none() {
1843 return Ok(None);
1849 }
1850
1851 gap_chunk_id
1852 } else {
1853 None
1854 };
1855
1856 let DeduplicationOutcome {
1857 all_events: mut events,
1858 in_memory_duplicated_event_ids,
1859 in_store_duplicated_event_ids,
1860 non_empty_all_duplicates: all_duplicates,
1861 } = filter_duplicate_events(
1862 &self.store,
1863 LinkedChunkId::Room(self.room.as_ref()),
1864 &self.room_linked_chunk,
1865 events,
1866 )
1867 .await?;
1868
1869 if !all_duplicates {
1883 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1885 .await?;
1886 } else {
1887 events.clear();
1889 new_token = None;
1892 }
1893
1894 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1897
1898 let new_gap = new_token.map(|prev_token| Gap { prev_token });
1899 let reached_start = self.room_linked_chunk.finish_back_pagination(
1900 prev_gap_id,
1901 new_gap,
1902 &topo_ordered_events,
1903 );
1904
1905 self.post_process_new_events(topo_ordered_events, false).await?;
1907
1908 let event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1909
1910 Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1911 }
1912
1913 pub fn subscribe_to_thread(
1916 &mut self,
1917 root: OwnedEventId,
1918 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1919 self.get_or_reload_thread(root).subscribe()
1920 }
1921
1922 pub fn finish_thread_network_pagination(
1926 &mut self,
1927 root: OwnedEventId,
1928 prev_token: Option<String>,
1929 new_token: Option<String>,
1930 events: Vec<Event>,
1931 ) -> Option<BackPaginationOutcome> {
1932 self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1933 }
1934
1935 pub fn load_more_thread_events_backwards(
1936 &mut self,
1937 root: OwnedEventId,
1938 ) -> LoadMoreEventsBackwardsOutcome {
1939 self.get_or_reload_thread(root).load_more_events_backwards()
1940 }
1941 }
1942}
1943
1944pub(super) enum EventLocation {
1946 Memory(Position),
1948
1949 Store,
1951}
1952
1953pub(super) use private::RoomEventCacheState;
1954
1955#[cfg(test)]
1956mod tests {
1957 use matrix_sdk_base::event_cache::Event;
1958 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1959 use ruma::{
1960 event_id,
1961 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1962 room_id, user_id, RoomId,
1963 };
1964
1965 use crate::test_utils::logged_in_client;
1966
1967 #[async_test]
1968 async fn test_find_event_by_id_with_edit_relation() {
1969 let original_id = event_id!("$original");
1970 let related_id = event_id!("$related");
1971 let room_id = room_id!("!galette:saucisse.bzh");
1972 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1973
1974 assert_relations(
1975 room_id,
1976 f.text_msg("Original event").event_id(original_id).into(),
1977 f.text_msg("* An edited event")
1978 .edit(
1979 original_id,
1980 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1981 )
1982 .event_id(related_id)
1983 .into(),
1984 f,
1985 )
1986 .await;
1987 }
1988
1989 #[async_test]
1990 async fn test_find_event_by_id_with_thread_reply_relation() {
1991 let original_id = event_id!("$original");
1992 let related_id = event_id!("$related");
1993 let room_id = room_id!("!galette:saucisse.bzh");
1994 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1995
1996 assert_relations(
1997 room_id,
1998 f.text_msg("Original event").event_id(original_id).into(),
1999 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2000 f,
2001 )
2002 .await;
2003 }
2004
2005 #[async_test]
2006 async fn test_find_event_by_id_with_reaction_relation() {
2007 let original_id = event_id!("$original");
2008 let related_id = event_id!("$related");
2009 let room_id = room_id!("!galette:saucisse.bzh");
2010 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2011
2012 assert_relations(
2013 room_id,
2014 f.text_msg("Original event").event_id(original_id).into(),
2015 f.reaction(original_id, ":D").event_id(related_id).into(),
2016 f,
2017 )
2018 .await;
2019 }
2020
2021 #[async_test]
2022 async fn test_find_event_by_id_with_poll_response_relation() {
2023 let original_id = event_id!("$original");
2024 let related_id = event_id!("$related");
2025 let room_id = room_id!("!galette:saucisse.bzh");
2026 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2027
2028 assert_relations(
2029 room_id,
2030 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2031 .event_id(original_id)
2032 .into(),
2033 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2034 f,
2035 )
2036 .await;
2037 }
2038
2039 #[async_test]
2040 async fn test_find_event_by_id_with_poll_end_relation() {
2041 let original_id = event_id!("$original");
2042 let related_id = event_id!("$related");
2043 let room_id = room_id!("!galette:saucisse.bzh");
2044 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2045
2046 assert_relations(
2047 room_id,
2048 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2049 .event_id(original_id)
2050 .into(),
2051 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2052 f,
2053 )
2054 .await;
2055 }
2056
2057 #[async_test]
2058 async fn test_find_event_by_id_with_filtered_relationships() {
2059 let original_id = event_id!("$original");
2060 let related_id = event_id!("$related");
2061 let associated_related_id = event_id!("$recursive_related");
2062 let room_id = room_id!("!galette:saucisse.bzh");
2063 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2064
2065 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2066 let related_event = event_factory
2067 .text_msg("* Edited event")
2068 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2069 .event_id(related_id)
2070 .into();
2071 let associated_related_event =
2072 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2073
2074 let client = logged_in_client(None).await;
2075
2076 let event_cache = client.event_cache();
2077 event_cache.subscribe().unwrap();
2078
2079 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2080 let room = client.get_room(room_id).unwrap();
2081
2082 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2083
2084 room_event_cache.save_events([original_event]).await;
2086
2087 room_event_cache.save_events([related_event]).await;
2089
2090 room_event_cache.save_events([associated_related_event]).await;
2092
2093 let filter = Some(vec![RelationType::Replacement]);
2094 let (event, related_events) =
2095 room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2096 let cached_event_id = event.event_id().unwrap();
2098 assert_eq!(cached_event_id, original_id);
2099
2100 assert_eq!(related_events.len(), 1);
2102
2103 let related_event_id = related_events[0].event_id().unwrap();
2104 assert_eq!(related_event_id, related_id);
2105
2106 let filter = Some(vec![RelationType::Thread]);
2108 let (event, related_events) =
2109 room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2110 let cached_event_id = event.event_id().unwrap();
2112 assert_eq!(cached_event_id, original_id);
2113 assert!(related_events.is_empty());
2115 }
2116
2117 #[async_test]
2118 async fn test_find_event_by_id_with_recursive_relation() {
2119 let original_id = event_id!("$original");
2120 let related_id = event_id!("$related");
2121 let associated_related_id = event_id!("$recursive_related");
2122 let room_id = room_id!("!galette:saucisse.bzh");
2123 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2124
2125 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2126 let related_event = event_factory
2127 .text_msg("* Edited event")
2128 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2129 .event_id(related_id)
2130 .into();
2131 let associated_related_event =
2132 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2133
2134 let client = logged_in_client(None).await;
2135
2136 let event_cache = client.event_cache();
2137 event_cache.subscribe().unwrap();
2138
2139 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2140 let room = client.get_room(room_id).unwrap();
2141
2142 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2143
2144 room_event_cache.save_events([original_event]).await;
2146
2147 room_event_cache.save_events([related_event]).await;
2149
2150 room_event_cache.save_events([associated_related_event]).await;
2152
2153 let (event, related_events) =
2154 room_event_cache.find_event_with_relations(original_id, None).await.unwrap();
2155 let cached_event_id = event.event_id().unwrap();
2157 assert_eq!(cached_event_id, original_id);
2158
2159 assert_eq!(related_events.len(), 2);
2161
2162 let related_event_id = related_events[0].event_id().unwrap();
2163 assert_eq!(related_event_id, related_id);
2164 let related_event_id = related_events[1].event_id().unwrap();
2165 assert_eq!(related_event_id, associated_related_id);
2166 }
2167
2168 async fn assert_relations(
2169 room_id: &RoomId,
2170 original_event: Event,
2171 related_event: Event,
2172 event_factory: EventFactory,
2173 ) {
2174 let client = logged_in_client(None).await;
2175
2176 let event_cache = client.event_cache();
2177 event_cache.subscribe().unwrap();
2178
2179 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2180 let room = client.get_room(room_id).unwrap();
2181
2182 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2183
2184 let original_event_id = original_event.event_id().unwrap();
2186 room_event_cache.save_events([original_event]).await;
2187
2188 let unrelated_id = event_id!("$2");
2190 room_event_cache
2191 .save_events([event_factory
2192 .text_msg("An unrelated event")
2193 .event_id(unrelated_id)
2194 .into()])
2195 .await;
2196
2197 let related_id = related_event.event_id().unwrap();
2199 room_event_cache.save_events([related_event]).await;
2200
2201 let (event, related_events) =
2202 room_event_cache.find_event_with_relations(&original_event_id, None).await.unwrap();
2203 let cached_event_id = event.event_id().unwrap();
2205 assert_eq!(cached_event_id, original_event_id);
2206
2207 let related_event_id = related_events[0].event_id().unwrap();
2209 assert_eq!(related_event_id, related_id);
2210 }
2211}
2212
2213#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
2215 use std::sync::Arc;
2216
2217 use assert_matches::assert_matches;
2218 use assert_matches2::assert_let;
2219 use eyeball_im::VectorDiff;
2220 use futures_util::FutureExt;
2221 use matrix_sdk_base::{
2222 event_cache::{
2223 store::{EventCacheStore as _, MemoryStore},
2224 Gap,
2225 },
2226 linked_chunk::{
2227 lazy_loader::from_all_chunks, ChunkContent, ChunkIdentifier, LinkedChunkId, Position,
2228 Update,
2229 },
2230 store::StoreConfig,
2231 sync::{JoinedRoomUpdate, Timeline},
2232 };
2233 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
2234 use ruma::{
2235 event_id,
2236 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2237 room_id, user_id, OwnedUserId,
2238 };
2239 use tokio::task::yield_now;
2240
2241 use super::RoomEventCacheGenericUpdate;
2242 use crate::{
2243 assert_let_timeout,
2244 event_cache::{room::LoadMoreEventsBackwardsOutcome, RoomEventCacheUpdate},
2245 test_utils::client::MockClientBuilder,
2246 };
2247
2248 #[async_test]
2249 async fn test_write_to_storage() {
2250 let room_id = room_id!("!galette:saucisse.bzh");
2251 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2252
2253 let event_cache_store = Arc::new(MemoryStore::new());
2254
2255 let client = MockClientBuilder::new(None)
2256 .on_builder(|builder| {
2257 builder.store_config(
2258 StoreConfig::new("hodlor".to_owned())
2259 .event_cache_store(event_cache_store.clone()),
2260 )
2261 })
2262 .build()
2263 .await;
2264
2265 let event_cache = client.event_cache();
2266
2267 event_cache.subscribe().unwrap();
2269
2270 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2271 let room = client.get_room(room_id).unwrap();
2272
2273 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2274 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2275
2276 let timeline = Timeline {
2278 limited: true,
2279 prev_batch: Some("raclette".to_owned()),
2280 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2281 };
2282
2283 room_event_cache
2284 .inner
2285 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2286 .await
2287 .unwrap();
2288
2289 assert_matches!(
2291 generic_stream.recv().await,
2292 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2293 assert_eq!(expected_room_id, room_id);
2294 }
2295 );
2296
2297 let linked_chunk = from_all_chunks::<3, _, _>(
2299 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2300 )
2301 .unwrap()
2302 .unwrap();
2303
2304 assert_eq!(linked_chunk.chunks().count(), 2);
2305
2306 let mut chunks = linked_chunk.chunks();
2307
2308 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2310 assert_eq!(gap.prev_token, "raclette");
2311 });
2312
2313 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2315 assert_eq!(events.len(), 1);
2316 let deserialized = events[0].raw().deserialize().unwrap();
2317 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2318 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2319 });
2320
2321 assert!(chunks.next().is_none());
2323 }
2324
2325 #[async_test]
2326 async fn test_write_to_storage_strips_bundled_relations() {
2327 let room_id = room_id!("!galette:saucisse.bzh");
2328 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2329
2330 let event_cache_store = Arc::new(MemoryStore::new());
2331
2332 let client = MockClientBuilder::new(None)
2333 .on_builder(|builder| {
2334 builder.store_config(
2335 StoreConfig::new("hodlor".to_owned())
2336 .event_cache_store(event_cache_store.clone()),
2337 )
2338 })
2339 .build()
2340 .await;
2341
2342 let event_cache = client.event_cache();
2343
2344 event_cache.subscribe().unwrap();
2346
2347 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2348 let room = client.get_room(room_id).unwrap();
2349
2350 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2351 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2352
2353 let ev = f
2355 .text_msg("hey yo")
2356 .sender(*ALICE)
2357 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2358 .into_event();
2359
2360 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2361
2362 room_event_cache
2363 .inner
2364 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2365 .await
2366 .unwrap();
2367
2368 assert_matches!(
2370 generic_stream.recv().await,
2371 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2372 assert_eq!(expected_room_id, room_id);
2373 }
2374 );
2375
2376 {
2378 let events = room_event_cache.events().await;
2379
2380 assert_eq!(events.len(), 1);
2381
2382 let ev = events[0].raw().deserialize().unwrap();
2383 assert_let!(
2384 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2385 );
2386
2387 let original = msg.as_original().unwrap();
2388 assert_eq!(original.content.body(), "hey yo");
2389 assert!(original.unsigned.relations.replace.is_some());
2390 }
2391
2392 let linked_chunk = from_all_chunks::<3, _, _>(
2394 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2395 )
2396 .unwrap()
2397 .unwrap();
2398
2399 assert_eq!(linked_chunk.chunks().count(), 1);
2400
2401 let mut chunks = linked_chunk.chunks();
2402 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2403 assert_eq!(events.len(), 1);
2404
2405 let ev = events[0].raw().deserialize().unwrap();
2406 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2407
2408 let original = msg.as_original().unwrap();
2409 assert_eq!(original.content.body(), "hey yo");
2410 assert!(original.unsigned.relations.replace.is_none());
2411 });
2412
2413 assert!(chunks.next().is_none());
2415 }
2416
2417 #[async_test]
2418 async fn test_clear() {
2419 let room_id = room_id!("!galette:saucisse.bzh");
2420 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2421
2422 let event_cache_store = Arc::new(MemoryStore::new());
2423
2424 let event_id1 = event_id!("$1");
2425 let event_id2 = event_id!("$2");
2426
2427 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2428 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2429
2430 event_cache_store
2432 .handle_linked_chunk_updates(
2433 LinkedChunkId::Room(room_id),
2434 vec![
2435 Update::NewItemsChunk {
2437 previous: None,
2438 new: ChunkIdentifier::new(0),
2439 next: None,
2440 },
2441 Update::NewGapChunk {
2443 previous: Some(ChunkIdentifier::new(0)),
2444 new: ChunkIdentifier::new(42),
2446 next: None,
2447 gap: Gap { prev_token: "comté".to_owned() },
2448 },
2449 Update::NewItemsChunk {
2451 previous: Some(ChunkIdentifier::new(42)),
2452 new: ChunkIdentifier::new(1),
2453 next: None,
2454 },
2455 Update::PushItems {
2456 at: Position::new(ChunkIdentifier::new(1), 0),
2457 items: vec![ev1.clone()],
2458 },
2459 Update::NewItemsChunk {
2461 previous: Some(ChunkIdentifier::new(1)),
2462 new: ChunkIdentifier::new(2),
2463 next: None,
2464 },
2465 Update::PushItems {
2466 at: Position::new(ChunkIdentifier::new(2), 0),
2467 items: vec![ev2.clone()],
2468 },
2469 ],
2470 )
2471 .await
2472 .unwrap();
2473
2474 let client = MockClientBuilder::new(None)
2475 .on_builder(|builder| {
2476 builder.store_config(
2477 StoreConfig::new("hodlor".to_owned())
2478 .event_cache_store(event_cache_store.clone()),
2479 )
2480 })
2481 .build()
2482 .await;
2483
2484 let event_cache = client.event_cache();
2485
2486 event_cache.subscribe().unwrap();
2488
2489 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2490 let room = client.get_room(room_id).unwrap();
2491
2492 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2493
2494 let (items, mut stream) = room_event_cache.subscribe().await;
2495 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2496
2497 {
2499 assert!(room_event_cache.find_event(event_id1).await.is_some());
2500 assert!(room_event_cache.find_event(event_id2).await.is_some());
2501 }
2502
2503 {
2505 assert_eq!(items.len(), 1);
2507 assert_eq!(items[0].event_id().unwrap(), event_id2);
2508
2509 assert!(stream.is_empty());
2510 }
2511
2512 {
2514 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2515
2516 assert_let_timeout!(
2517 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2518 );
2519 assert_eq!(diffs.len(), 1);
2520 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2521 assert_eq!(event.event_id().unwrap(), event_id1);
2523 });
2524
2525 assert!(stream.is_empty());
2526 }
2527
2528 room_event_cache.clear().await.unwrap();
2530
2531 assert_let_timeout!(
2533 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2534 );
2535 assert_eq!(diffs.len(), 1);
2536 assert_let!(VectorDiff::Clear = &diffs[0]);
2537
2538 assert_let_timeout!(
2540 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2541 );
2542 assert_eq!(received_room_id, room_id);
2543
2544 assert!(room_event_cache.find_event(event_id1).await.is_some());
2547
2548 let items = room_event_cache.events().await;
2550 assert!(items.is_empty());
2551
2552 let linked_chunk = from_all_chunks::<3, _, _>(
2554 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2555 )
2556 .unwrap()
2557 .unwrap();
2558
2559 assert_eq!(linked_chunk.num_items(), 0);
2563 }
2564
2565 #[async_test]
2566 async fn test_load_from_storage() {
2567 let room_id = room_id!("!galette:saucisse.bzh");
2568 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2569
2570 let event_cache_store = Arc::new(MemoryStore::new());
2571
2572 let event_id1 = event_id!("$1");
2573 let event_id2 = event_id!("$2");
2574
2575 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2576 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2577
2578 event_cache_store
2580 .handle_linked_chunk_updates(
2581 LinkedChunkId::Room(room_id),
2582 vec![
2583 Update::NewItemsChunk {
2585 previous: None,
2586 new: ChunkIdentifier::new(0),
2587 next: None,
2588 },
2589 Update::NewGapChunk {
2591 previous: Some(ChunkIdentifier::new(0)),
2592 new: ChunkIdentifier::new(42),
2594 next: None,
2595 gap: Gap { prev_token: "cheddar".to_owned() },
2596 },
2597 Update::NewItemsChunk {
2599 previous: Some(ChunkIdentifier::new(42)),
2600 new: ChunkIdentifier::new(1),
2601 next: None,
2602 },
2603 Update::PushItems {
2604 at: Position::new(ChunkIdentifier::new(1), 0),
2605 items: vec![ev1.clone()],
2606 },
2607 Update::NewItemsChunk {
2609 previous: Some(ChunkIdentifier::new(1)),
2610 new: ChunkIdentifier::new(2),
2611 next: None,
2612 },
2613 Update::PushItems {
2614 at: Position::new(ChunkIdentifier::new(2), 0),
2615 items: vec![ev2.clone()],
2616 },
2617 ],
2618 )
2619 .await
2620 .unwrap();
2621
2622 let client = MockClientBuilder::new(None)
2623 .on_builder(|builder| {
2624 builder.store_config(
2625 StoreConfig::new("hodlor".to_owned())
2626 .event_cache_store(event_cache_store.clone()),
2627 )
2628 })
2629 .build()
2630 .await;
2631
2632 let event_cache = client.event_cache();
2633
2634 event_cache.subscribe().unwrap();
2636
2637 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2639
2640 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2641 let room = client.get_room(room_id).unwrap();
2642
2643 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2644
2645 assert_matches!(
2648 generic_stream.recv().await,
2649 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2650 assert_eq!(room_id, expected_room_id);
2651 }
2652 );
2653
2654 let (items, mut stream) = room_event_cache.subscribe().await;
2655
2656 assert_eq!(items.len(), 1);
2659 assert_eq!(items[0].event_id().unwrap(), event_id2);
2660 assert!(stream.is_empty());
2661
2662 assert!(room_event_cache.find_event(event_id1).await.is_some());
2664 assert!(room_event_cache.find_event(event_id2).await.is_some());
2665
2666 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2668
2669 assert_let_timeout!(
2670 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2671 );
2672 assert_eq!(diffs.len(), 1);
2673 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2674 assert_eq!(event.event_id().unwrap(), event_id1);
2675 });
2676
2677 assert!(stream.is_empty());
2678
2679 assert_matches!(
2681 generic_stream.recv().await,
2682 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2683 assert_eq!(expected_room_id, room_id);
2684 }
2685 );
2686
2687 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2689
2690 room_event_cache
2691 .inner
2692 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2693 .await
2694 .unwrap();
2695
2696 assert!(generic_stream.recv().now_or_never().is_none());
2699
2700 let items = room_event_cache.events().await;
2705 assert_eq!(items.len(), 2);
2706 assert_eq!(items[0].event_id().unwrap(), event_id1);
2707 assert_eq!(items[1].event_id().unwrap(), event_id2);
2708 }
2709
2710 #[async_test]
2711 async fn test_load_from_storage_resilient_to_failure() {
2712 let room_id = room_id!("!fondue:patate.ch");
2713 let event_cache_store = Arc::new(MemoryStore::new());
2714
2715 let event = EventFactory::new()
2716 .room(room_id)
2717 .sender(user_id!("@ben:saucisse.bzh"))
2718 .text_msg("foo")
2719 .event_id(event_id!("$42"))
2720 .into_event();
2721
2722 event_cache_store
2724 .handle_linked_chunk_updates(
2725 LinkedChunkId::Room(room_id),
2726 vec![
2727 Update::NewItemsChunk {
2728 previous: None,
2729 new: ChunkIdentifier::new(0),
2730 next: None,
2731 },
2732 Update::PushItems {
2733 at: Position::new(ChunkIdentifier::new(0), 0),
2734 items: vec![event],
2735 },
2736 Update::NewItemsChunk {
2737 previous: Some(ChunkIdentifier::new(0)),
2738 new: ChunkIdentifier::new(1),
2739 next: Some(ChunkIdentifier::new(0)),
2740 },
2741 ],
2742 )
2743 .await
2744 .unwrap();
2745
2746 let client = MockClientBuilder::new(None)
2747 .on_builder(|builder| {
2748 builder.store_config(
2749 StoreConfig::new("holder".to_owned())
2750 .event_cache_store(event_cache_store.clone()),
2751 )
2752 })
2753 .build()
2754 .await;
2755
2756 let event_cache = client.event_cache();
2757
2758 event_cache.subscribe().unwrap();
2760
2761 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2762 let room = client.get_room(room_id).unwrap();
2763
2764 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2765
2766 let items = room_event_cache.events().await;
2767
2768 assert!(items.is_empty());
2771
2772 let raw_chunks =
2775 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
2776 assert!(raw_chunks.is_empty());
2777 }
2778
2779 #[async_test]
2780 async fn test_no_useless_gaps() {
2781 let room_id = room_id!("!galette:saucisse.bzh");
2782
2783 let client = MockClientBuilder::new(None).build().await;
2784
2785 let event_cache = client.event_cache();
2786 event_cache.subscribe().unwrap();
2787
2788 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2789 let room = client.get_room(room_id).unwrap();
2790 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2791 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2792
2793 let f = EventFactory::new().room(room_id).sender(*ALICE);
2794
2795 room_event_cache
2798 .inner
2799 .handle_joined_room_update(JoinedRoomUpdate {
2800 timeline: Timeline {
2801 limited: true,
2802 prev_batch: Some("raclette".to_owned()),
2803 events: vec![f.text_msg("hey yo").into_event()],
2804 },
2805 ..Default::default()
2806 })
2807 .await
2808 .unwrap();
2809
2810 assert_matches!(
2812 generic_stream.recv().await,
2813 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2814 assert_eq!(expected_room_id, room_id);
2815 }
2816 );
2817
2818 {
2819 let mut state = room_event_cache.inner.state.write().await;
2820
2821 let mut num_gaps = 0;
2822 let mut num_events = 0;
2823
2824 for c in state.room_linked_chunk().chunks() {
2825 match c.content() {
2826 ChunkContent::Items(items) => num_events += items.len(),
2827 ChunkContent::Gap(_) => num_gaps += 1,
2828 }
2829 }
2830
2831 assert_eq!(num_gaps, 0);
2834 assert_eq!(num_events, 1);
2835
2836 assert_matches!(
2838 state.load_more_events_backwards().await.unwrap(),
2839 LoadMoreEventsBackwardsOutcome::Gap { .. }
2840 );
2841
2842 num_gaps = 0;
2843 num_events = 0;
2844 for c in state.room_linked_chunk().chunks() {
2845 match c.content() {
2846 ChunkContent::Items(items) => num_events += items.len(),
2847 ChunkContent::Gap(_) => num_gaps += 1,
2848 }
2849 }
2850
2851 assert_eq!(num_gaps, 1);
2853 assert_eq!(num_events, 1);
2854 }
2855
2856 room_event_cache
2859 .inner
2860 .handle_joined_room_update(JoinedRoomUpdate {
2861 timeline: Timeline {
2862 limited: false,
2863 prev_batch: Some("fondue".to_owned()),
2864 events: vec![f.text_msg("sup").into_event()],
2865 },
2866 ..Default::default()
2867 })
2868 .await
2869 .unwrap();
2870
2871 assert_matches!(
2873 generic_stream.recv().await,
2874 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2875 assert_eq!(expected_room_id, room_id);
2876 }
2877 );
2878
2879 {
2880 let state = room_event_cache.inner.state.read().await;
2881
2882 let mut num_gaps = 0;
2883 let mut num_events = 0;
2884
2885 for c in state.room_linked_chunk().chunks() {
2886 match c.content() {
2887 ChunkContent::Items(items) => num_events += items.len(),
2888 ChunkContent::Gap(gap) => {
2889 assert_eq!(gap.prev_token, "raclette");
2890 num_gaps += 1;
2891 }
2892 }
2893 }
2894
2895 assert_eq!(num_gaps, 1);
2897 assert_eq!(num_events, 2);
2898 }
2899 }
2900
2901 #[async_test]
2902 async fn test_shrink_to_last_chunk() {
2903 let room_id = room_id!("!galette:saucisse.bzh");
2904
2905 let client = MockClientBuilder::new(None).build().await;
2906
2907 let f = EventFactory::new().room(room_id);
2908
2909 let evid1 = event_id!("$1");
2910 let evid2 = event_id!("$2");
2911
2912 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2913 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2914
2915 {
2917 let store = client.event_cache_store();
2918 let store = store.lock().await.unwrap();
2919 store
2920 .handle_linked_chunk_updates(
2921 LinkedChunkId::Room(room_id),
2922 vec![
2923 Update::NewItemsChunk {
2924 previous: None,
2925 new: ChunkIdentifier::new(0),
2926 next: None,
2927 },
2928 Update::PushItems {
2929 at: Position::new(ChunkIdentifier::new(0), 0),
2930 items: vec![ev1],
2931 },
2932 Update::NewItemsChunk {
2933 previous: Some(ChunkIdentifier::new(0)),
2934 new: ChunkIdentifier::new(1),
2935 next: None,
2936 },
2937 Update::PushItems {
2938 at: Position::new(ChunkIdentifier::new(1), 0),
2939 items: vec![ev2],
2940 },
2941 ],
2942 )
2943 .await
2944 .unwrap();
2945 }
2946
2947 let event_cache = client.event_cache();
2948 event_cache.subscribe().unwrap();
2949
2950 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2951 let room = client.get_room(room_id).unwrap();
2952 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2953
2954 let (events, mut stream) = room_event_cache.subscribe().await;
2956 assert_eq!(events.len(), 1);
2957 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2958 assert!(stream.is_empty());
2959
2960 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2961
2962 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2964 assert_eq!(outcome.events.len(), 1);
2965 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2966 assert!(outcome.reached_start);
2967
2968 assert_let_timeout!(
2970 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2971 );
2972 assert_eq!(diffs.len(), 1);
2973 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2974 assert_eq!(value.event_id().as_deref(), Some(evid1));
2975 });
2976
2977 assert!(stream.is_empty());
2978
2979 assert_let_timeout!(
2981 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2982 );
2983 assert_eq!(received_room_id, room_id);
2984
2985 let diffs = room_event_cache
2987 .inner
2988 .state
2989 .write()
2990 .await
2991 .force_shrink_to_last_chunk()
2992 .await
2993 .expect("shrinking should succeed");
2994
2995 assert_eq!(diffs.len(), 2);
2997 assert_matches!(&diffs[0], VectorDiff::Clear);
2998 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
2999 assert_eq!(values.len(), 1);
3000 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3001 });
3002
3003 assert!(stream.is_empty());
3004
3005 assert!(generic_stream.is_empty());
3007
3008 let events = room_event_cache.events().await;
3010 assert_eq!(events.len(), 1);
3011 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3012
3013 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3016 assert_eq!(outcome.events.len(), 1);
3017 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3018 assert!(outcome.reached_start);
3019 }
3020
3021 #[async_test]
3022 async fn test_room_ordering() {
3023 let room_id = room_id!("!galette:saucisse.bzh");
3024
3025 let client = MockClientBuilder::new(None).build().await;
3026
3027 let f = EventFactory::new().room(room_id).sender(*ALICE);
3028
3029 let evid1 = event_id!("$1");
3030 let evid2 = event_id!("$2");
3031 let evid3 = event_id!("$3");
3032
3033 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3034 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3035 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3036
3037 {
3039 let store = client.event_cache_store();
3040 let store = store.lock().await.unwrap();
3041 store
3042 .handle_linked_chunk_updates(
3043 LinkedChunkId::Room(room_id),
3044 vec![
3045 Update::NewItemsChunk {
3046 previous: None,
3047 new: ChunkIdentifier::new(0),
3048 next: None,
3049 },
3050 Update::PushItems {
3051 at: Position::new(ChunkIdentifier::new(0), 0),
3052 items: vec![ev1, ev2],
3053 },
3054 Update::NewItemsChunk {
3055 previous: Some(ChunkIdentifier::new(0)),
3056 new: ChunkIdentifier::new(1),
3057 next: None,
3058 },
3059 Update::PushItems {
3060 at: Position::new(ChunkIdentifier::new(1), 0),
3061 items: vec![ev3.clone()],
3062 },
3063 ],
3064 )
3065 .await
3066 .unwrap();
3067 }
3068
3069 let event_cache = client.event_cache();
3070 event_cache.subscribe().unwrap();
3071
3072 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3073 let room = client.get_room(room_id).unwrap();
3074 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3075
3076 {
3079 let state = room_event_cache.inner.state.read().await;
3080
3081 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3083
3084 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3086
3087 let mut events = state.room_linked_chunk().events();
3089 let (pos, ev) = events.next().unwrap();
3090 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3091 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3092 assert_eq!(state.room_event_order(pos), Some(2));
3093
3094 assert!(events.next().is_none());
3096 }
3097
3098 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3100 assert!(outcome.reached_start);
3101
3102 {
3105 let state = room_event_cache.inner.state.read().await;
3106 for (i, (pos, _)) in state.room_linked_chunk().events().enumerate() {
3107 assert_eq!(state.room_event_order(pos), Some(i));
3108 }
3109 }
3110
3111 let evid4 = event_id!("$4");
3116 room_event_cache
3117 .inner
3118 .handle_joined_room_update(JoinedRoomUpdate {
3119 timeline: Timeline {
3120 limited: true,
3121 prev_batch: Some("fondue".to_owned()),
3122 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3123 },
3124 ..Default::default()
3125 })
3126 .await
3127 .unwrap();
3128
3129 {
3130 let state = room_event_cache.inner.state.read().await;
3131
3132 let mut events = state.room_linked_chunk().events();
3134
3135 let (pos, ev) = events.next().unwrap();
3136 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3137 assert_eq!(state.room_event_order(pos), Some(2));
3138
3139 let (pos, ev) = events.next().unwrap();
3140 assert_eq!(ev.event_id().as_deref(), Some(evid4));
3141 assert_eq!(state.room_event_order(pos), Some(3));
3142
3143 assert!(events.next().is_none());
3145
3146 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3148 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3149
3150 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(1), 0)), None);
3153 }
3154 }
3155
3156 #[async_test]
3157 async fn test_auto_shrink_after_all_subscribers_are_gone() {
3158 let room_id = room_id!("!galette:saucisse.bzh");
3159
3160 let client = MockClientBuilder::new(None).build().await;
3161
3162 let f = EventFactory::new().room(room_id);
3163
3164 let evid1 = event_id!("$1");
3165 let evid2 = event_id!("$2");
3166
3167 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3168 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3169
3170 {
3172 let store = client.event_cache_store();
3173 let store = store.lock().await.unwrap();
3174 store
3175 .handle_linked_chunk_updates(
3176 LinkedChunkId::Room(room_id),
3177 vec![
3178 Update::NewItemsChunk {
3179 previous: None,
3180 new: ChunkIdentifier::new(0),
3181 next: None,
3182 },
3183 Update::PushItems {
3184 at: Position::new(ChunkIdentifier::new(0), 0),
3185 items: vec![ev1],
3186 },
3187 Update::NewItemsChunk {
3188 previous: Some(ChunkIdentifier::new(0)),
3189 new: ChunkIdentifier::new(1),
3190 next: None,
3191 },
3192 Update::PushItems {
3193 at: Position::new(ChunkIdentifier::new(1), 0),
3194 items: vec![ev2],
3195 },
3196 ],
3197 )
3198 .await
3199 .unwrap();
3200 }
3201
3202 let event_cache = client.event_cache();
3203 event_cache.subscribe().unwrap();
3204
3205 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3206 let room = client.get_room(room_id).unwrap();
3207 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3208
3209 let (events1, mut stream1) = room_event_cache.subscribe().await;
3211 assert_eq!(events1.len(), 1);
3212 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3213 assert!(stream1.is_empty());
3214
3215 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3217 assert_eq!(outcome.events.len(), 1);
3218 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3219 assert!(outcome.reached_start);
3220
3221 assert_let_timeout!(
3224 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3225 );
3226 assert_eq!(diffs.len(), 1);
3227 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3228 assert_eq!(value.event_id().as_deref(), Some(evid1));
3229 });
3230
3231 assert!(stream1.is_empty());
3232
3233 let (events2, stream2) = room_event_cache.subscribe().await;
3237 assert_eq!(events2.len(), 2);
3238 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3239 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3240 assert!(stream2.is_empty());
3241
3242 drop(stream1);
3244 yield_now().await;
3245
3246 assert!(stream2.is_empty());
3248
3249 drop(stream2);
3251 yield_now().await;
3252
3253 {
3256 let state = room_event_cache.inner.state.read().await;
3258 assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0);
3259 }
3260
3261 let events3 = room_event_cache.events().await;
3263 assert_eq!(events3.len(), 1);
3264 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3265 }
3266
3267 #[async_test]
3268 async fn test_rfind_map_event_in_memory_by() {
3269 let user_id = user_id!("@mnt_io:matrix.org");
3270 let room_id = room_id!("!raclette:patate.ch");
3271 let client = MockClientBuilder::new(None).build().await;
3272
3273 let event_factory = EventFactory::new().room(room_id);
3274
3275 let event_id_0 = event_id!("$ev0");
3276 let event_id_1 = event_id!("$ev1");
3277 let event_id_2 = event_id!("$ev2");
3278 let event_id_3 = event_id!("$ev3");
3279
3280 let event_0 =
3281 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3282 let event_1 =
3283 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3284 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3285 let event_3 =
3286 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3287
3288 {
3291 let store = client.event_cache_store();
3292 let store = store.lock().await.unwrap();
3293 store
3294 .handle_linked_chunk_updates(
3295 LinkedChunkId::Room(room_id),
3296 vec![
3297 Update::NewItemsChunk {
3298 previous: None,
3299 new: ChunkIdentifier::new(0),
3300 next: None,
3301 },
3302 Update::PushItems {
3303 at: Position::new(ChunkIdentifier::new(0), 0),
3304 items: vec![event_3],
3305 },
3306 Update::NewItemsChunk {
3307 previous: Some(ChunkIdentifier::new(0)),
3308 new: ChunkIdentifier::new(1),
3309 next: None,
3310 },
3311 Update::PushItems {
3312 at: Position::new(ChunkIdentifier::new(1), 0),
3313 items: vec![event_0, event_1, event_2],
3314 },
3315 ],
3316 )
3317 .await
3318 .unwrap();
3319 }
3320
3321 let event_cache = client.event_cache();
3322 event_cache.subscribe().unwrap();
3323
3324 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3325 let room = client.get_room(room_id).unwrap();
3326 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3327
3328 assert_matches!(
3330 room_event_cache
3331 .rfind_map_event_in_memory_by(|event| {
3332 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
3333 })
3334 .await,
3335 Some(event_id) => {
3336 assert_eq!(event_id.as_deref(), Some(event_id_0));
3337 }
3338 );
3339
3340 assert_matches!(
3343 room_event_cache
3344 .rfind_map_event_in_memory_by(|event| {
3345 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
3346 })
3347 .await,
3348 Some(event_id) => {
3349 assert_eq!(event_id.as_deref(), Some(event_id_2));
3350 }
3351 );
3352
3353 assert!(room_event_cache
3355 .rfind_map_event_in_memory_by(|event| {
3356 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3357 == Some(user_id))
3358 .then(|| event.event_id())
3359 })
3360 .await
3361 .is_none());
3362
3363 assert!(room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.is_none());
3365 }
3366}