1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 Arc,
23 atomic::{AtomicUsize, Ordering},
24 },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::AmbiguityChange,
32 event_cache::Event,
33 linked_chunk::Position,
34 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37 EventId, OwnedEventId, OwnedRoomId,
38 api::Direction,
39 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40 serde::Raw,
41};
42use tokio::sync::{
43 Notify, RwLock,
44 broadcast::{Receiver, Sender},
45 mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50 AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51 RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54 client::WeakClient,
55 event_cache::EventCacheError,
56 room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64#[derive(Clone)]
68pub struct RoomEventCache {
69 pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("RoomEventCache").finish_non_exhaustive()
75 }
76}
77
78#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89 recv: Receiver<RoomEventCacheUpdate>,
91
92 room_id: OwnedRoomId,
94
95 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98 subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103 fn drop(&mut self) {
104 let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106 trace!(
107 "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108 );
109
110 if previous_subscriber_count == 1 {
111 let mut room_id = self.room_id.clone();
115
116 let mut num_attempts = 0;
122
123 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124 num_attempts += 1;
125
126 if num_attempts > 1024 {
127 warn!(
130 "couldn't send notification to the auto-shrink channel \
131 after 1024 attempts; giving up"
132 );
133 return;
134 }
135
136 match err {
137 mpsc::error::TrySendError::Full(stolen_room_id) => {
138 room_id = stolen_room_id;
139 }
140 mpsc::error::TrySendError::Closed(_) => return,
141 }
142 }
143
144 trace!("sent notification to the parent channel that we were the last subscriber");
145 }
146 }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150 type Target = Receiver<RoomEventCacheUpdate>;
151
152 fn deref(&self) -> &Self::Target {
153 &self.recv
154 }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158 fn deref_mut(&mut self) -> &mut Self::Target {
159 &mut self.recv
160 }
161}
162
163impl RoomEventCache {
164 pub(super) fn new(
166 client: WeakClient,
167 state: RoomEventCacheState,
168 pagination_status: SharedObservable<RoomPaginationStatus>,
169 room_id: OwnedRoomId,
170 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
172 ) -> Self {
173 Self {
174 inner: Arc::new(RoomEventCacheInner::new(
175 client,
176 state,
177 pagination_status,
178 room_id,
179 auto_shrink_sender,
180 generic_update_sender,
181 )),
182 }
183 }
184
185 pub async fn events(&self) -> Vec<Event> {
190 let state = self.inner.state.read().await;
191
192 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect()
193 }
194
195 pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheSubscriber) {
202 let state = self.inner.state.read().await;
203 let events =
204 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
205
206 let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst);
207 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
208
209 let recv = self.inner.sender.subscribe();
210 let subscriber = RoomEventCacheSubscriber {
211 recv,
212 room_id: self.inner.room_id.clone(),
213 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
214 subscriber_count: state.subscriber_count.clone(),
215 };
216
217 (events, subscriber)
218 }
219
220 pub async fn subscribe_to_thread(
223 &self,
224 thread_root: OwnedEventId,
225 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
226 let mut state = self.inner.state.write().await;
227 state.subscribe_to_thread(thread_root)
228 }
229
230 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
235 pub async fn paginate_thread_backwards(
236 &self,
237 thread_root: OwnedEventId,
238 num_events: u16,
239 ) -> Result<bool> {
240 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
241
242 let mut outcome =
244 self.inner.state.write().await.load_more_thread_events_backwards(thread_root.clone());
245
246 loop {
247 match outcome {
248 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
249 let options = RelationsOptions {
251 from: prev_token.clone(),
252 dir: Direction::Backward,
253 limit: Some(num_events.into()),
254 include_relations: IncludeRelations::AllRelations,
255 recurse: true,
256 };
257
258 let mut result = room
259 .relations(thread_root.clone(), options)
260 .await
261 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
262
263 let reached_start = result.next_batch_token.is_none();
264 trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
265
266 let root_event =
269 if reached_start {
270 Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
272 |err| EventCacheError::BackpaginationError(Box::new(err)),
273 )?)
274 } else {
275 None
276 };
277
278 let mut state = self.inner.state.write().await;
279
280 state.save_events(result.chunk.iter().cloned()).await?;
282
283 result.chunk.extend(root_event);
286
287 if let Some(outcome) = state.finish_thread_network_pagination(
288 thread_root.clone(),
289 prev_token,
290 result.next_batch_token,
291 result.chunk,
292 ) {
293 return Ok(outcome.reached_start);
294 }
295
296 outcome = state.load_more_thread_events_backwards(thread_root.clone());
298 }
299
300 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
301 return Ok(true);
303 }
304
305 LoadMoreEventsBackwardsOutcome::Events { .. } => {
306 unimplemented!("loading from disk for threads is not implemented yet");
308 }
309 }
310 }
311 }
312
313 pub fn pagination(&self) -> RoomPagination {
316 RoomPagination { inner: self.inner.clone() }
317 }
318
319 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Option<O>
325 where
326 P: FnMut(&Event) -> Option<O>,
327 {
328 self.inner.state.read().await.rfind_map_event_in_memory_by(predicate)
329 }
330
331 pub async fn find_event(&self, event_id: &EventId) -> Option<Event> {
336 self.inner
337 .state
338 .read()
339 .await
340 .find_event(event_id)
341 .await
342 .ok()
343 .flatten()
344 .map(|(_loc, event)| event)
345 }
346
347 pub async fn find_event_with_relations(
359 &self,
360 event_id: &EventId,
361 filter: Option<Vec<RelationType>>,
362 ) -> Option<(Event, Vec<Event>)> {
363 self.inner
365 .state
366 .read()
367 .await
368 .find_event_with_relations(event_id, filter.clone())
369 .await
370 .ok()
371 .flatten()
372 }
373
374 pub async fn clear(&self) -> Result<()> {
379 let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
381
382 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
384 diffs: updates_as_vector_diffs,
385 origin: EventsOrigin::Cache,
386 });
387
388 let _ = self
390 .inner
391 .generic_update_sender
392 .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
393
394 Ok(())
395 }
396
397 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
400 if let Err(err) = self.inner.state.write().await.save_events(events).await {
401 warn!("couldn't save event in the event cache: {err}");
402 }
403 }
404
405 pub async fn debug_string(&self) -> Vec<String> {
408 self.inner.state.read().await.room_linked_chunk().debug_string()
409 }
410}
411
412pub(super) struct RoomEventCacheInner {
414 pub(super) room_id: OwnedRoomId,
416
417 pub weak_room: WeakRoom,
418
419 pub sender: Sender<RoomEventCacheUpdate>,
421
422 pub state: RwLock<RoomEventCacheState>,
424
425 pub pagination_batch_token_notifier: Notify,
427
428 pub pagination_status: SharedObservable<RoomPaginationStatus>,
429
430 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
435
436 pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
442}
443
444impl RoomEventCacheInner {
445 fn new(
448 client: WeakClient,
449 state: RoomEventCacheState,
450 pagination_status: SharedObservable<RoomPaginationStatus>,
451 room_id: OwnedRoomId,
452 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
453 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
454 ) -> Self {
455 let sender = Sender::new(32);
456 let weak_room = WeakRoom::new(client, room_id);
457 Self {
458 room_id: weak_room.room_id().to_owned(),
459 weak_room,
460 state: RwLock::new(state),
461 sender,
462 pagination_batch_token_notifier: Default::default(),
463 auto_shrink_sender,
464 pagination_status,
465 generic_update_sender,
466 }
467 }
468
469 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
470 if account_data.is_empty() {
471 return;
472 }
473
474 let mut handled_read_marker = false;
475
476 trace!("Handling account data");
477
478 for raw_event in account_data {
479 match raw_event.deserialize() {
480 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
481 if handled_read_marker {
484 continue;
485 }
486
487 handled_read_marker = true;
488
489 let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
491 event_id: ev.content.event_id,
492 });
493 }
494
495 Ok(_) => {
496 }
499
500 Err(e) => {
501 let event_type = raw_event.get_field::<String>("type").ok().flatten();
502 warn!(event_type, "Failed to deserialize account data: {e}");
503 }
504 }
505 }
506 }
507
508 #[instrument(skip_all, fields(room_id = %self.room_id))]
509 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
510 self.handle_timeline(
511 updates.timeline,
512 updates.ephemeral.clone(),
513 updates.ambiguity_changes,
514 )
515 .await?;
516 self.handle_account_data(updates.account_data);
517
518 Ok(())
519 }
520
521 #[instrument(skip_all, fields(room_id = %self.room_id))]
522 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
523 self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
524
525 Ok(())
526 }
527
528 async fn handle_timeline(
531 &self,
532 timeline: Timeline,
533 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
534 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
535 ) -> Result<()> {
536 if timeline.events.is_empty()
537 && timeline.prev_batch.is_none()
538 && ephemeral_events.is_empty()
539 && ambiguity_changes.is_empty()
540 {
541 return Ok(());
542 }
543
544 trace!("adding new events");
546
547 let (stored_prev_batch_token, timeline_event_diffs) =
548 self.state.write().await.handle_sync(timeline).await?;
549
550 if stored_prev_batch_token {
553 self.pagination_batch_token_notifier.notify_one();
554 }
555
556 if !timeline_event_diffs.is_empty() {
559 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
560 diffs: timeline_event_diffs,
561 origin: EventsOrigin::Sync,
562 });
563
564 let _ = self
565 .generic_update_sender
566 .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
567 }
568
569 if !ephemeral_events.is_empty() {
570 let _ = self
571 .sender
572 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
573 }
574
575 if !ambiguity_changes.is_empty() {
576 let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
577 }
578
579 Ok(())
580 }
581}
582
583#[derive(Debug)]
586pub(super) enum LoadMoreEventsBackwardsOutcome {
587 Gap {
589 prev_token: Option<String>,
592 },
593
594 StartOfTimeline,
596
597 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
599}
600
601mod private {
603 use std::{
604 collections::{BTreeMap, HashMap, HashSet},
605 sync::{Arc, atomic::AtomicUsize},
606 };
607
608 use eyeball::SharedObservable;
609 use eyeball_im::VectorDiff;
610 use matrix_sdk_base::{
611 apply_redaction,
612 deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
613 event_cache::{
614 Event, Gap,
615 store::{DynEventCacheStore, EventCacheStoreLock},
616 },
617 linked_chunk::{
618 ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
619 OwnedLinkedChunkId, Position, Update,
620 lazy_loader::{self},
621 },
622 serde_helpers::{extract_edit_target, extract_thread_root},
623 sync::Timeline,
624 };
625 use matrix_sdk_common::executor::spawn;
626 use ruma::{
627 EventId, OwnedEventId, OwnedRoomId,
628 events::{
629 AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
630 relation::RelationType, room::redaction::SyncRoomRedactionEvent,
631 },
632 room_version_rules::RoomVersionRules,
633 serde::Raw,
634 };
635 use tokio::sync::broadcast::{Receiver, Sender};
636 use tracing::{debug, error, instrument, trace, warn};
637
638 use super::{
639 super::{EventCacheError, deduplicator::DeduplicationOutcome},
640 EventLocation, LoadMoreEventsBackwardsOutcome,
641 events::EventLinkedChunk,
642 sort_positions_descending,
643 };
644 use crate::event_cache::{
645 BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
646 ThreadEventCacheUpdate, deduplicator::filter_duplicate_events,
647 room::threads::ThreadEventCache,
648 };
649
650 pub struct RoomEventCacheState {
655 room: OwnedRoomId,
657
658 room_version_rules: RoomVersionRules,
660
661 enabled_thread_support: bool,
663
664 store: EventCacheStoreLock,
666
667 room_linked_chunk: EventLinkedChunk,
670
671 threads: HashMap<OwnedEventId, ThreadEventCache>,
675
676 pub waited_for_initial_prev_token: bool,
681
682 pagination_status: SharedObservable<RoomPaginationStatus>,
683
684 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
687
688 pub(super) subscriber_count: Arc<AtomicUsize>,
691 }
692
693 impl RoomEventCacheState {
694 pub async fn new(
704 room_id: OwnedRoomId,
705 room_version_rules: RoomVersionRules,
706 enabled_thread_support: bool,
707 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
708 store: EventCacheStoreLock,
709 pagination_status: SharedObservable<RoomPaginationStatus>,
710 ) -> Result<Self, EventCacheError> {
711 let store_lock = store.lock().await?;
712
713 let linked_chunk_id = LinkedChunkId::Room(&room_id);
714
715 let full_linked_chunk_metadata =
720 match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await {
721 Ok(metas) => metas,
722 Err(err) => {
723 error!(
724 "error when loading a linked chunk's metadata from the store: {err}"
725 );
726
727 store_lock
729 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
730 .await?;
731
732 None
734 }
735 };
736
737 let linked_chunk = match store_lock
738 .load_last_chunk(linked_chunk_id)
739 .await
740 .map_err(EventCacheError::from)
741 .and_then(|(last_chunk, chunk_identifier_generator)| {
742 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
743 .map_err(EventCacheError::from)
744 }) {
745 Ok(linked_chunk) => linked_chunk,
746 Err(err) => {
747 error!(
748 "error when loading a linked chunk's latest chunk from the store: {err}"
749 );
750
751 store_lock
753 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
754 .await?;
755
756 None
757 }
758 };
759
760 let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk(
761 linked_chunk,
762 full_linked_chunk_metadata,
763 );
764
765 let threads = HashMap::new();
770
771 Ok(Self {
772 room: room_id,
773 room_version_rules,
774 enabled_thread_support,
775 store,
776 room_linked_chunk,
777 threads,
778 waited_for_initial_prev_token: false,
779 subscriber_count: Default::default(),
780 pagination_status,
781 linked_chunk_update_sender,
782 })
783 }
784
785 async fn load_linked_chunk_metadata(
791 store: &DynEventCacheStore,
792 linked_chunk_id: LinkedChunkId<'_>,
793 ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
794 let mut all_chunks = store
795 .load_all_chunks_metadata(linked_chunk_id)
796 .await
797 .map_err(EventCacheError::from)?;
798
799 if all_chunks.is_empty() {
800 return Ok(None);
802 }
803
804 let chunk_map: HashMap<_, _> =
806 all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
807
808 let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
810 let Some(last) = iter.next() else {
811 return Err(EventCacheError::InvalidLinkedChunkMetadata {
812 details: "no last chunk found".to_owned(),
813 });
814 };
815
816 if let Some(other_last) = iter.next() {
818 return Err(EventCacheError::InvalidLinkedChunkMetadata {
819 details: format!(
820 "chunks {} and {} both claim to be last chunks",
821 last.identifier.index(),
822 other_last.identifier.index()
823 ),
824 });
825 }
826
827 let mut seen = HashSet::new();
830 let mut current = last;
831 loop {
832 if !seen.insert(current.identifier) {
834 return Err(EventCacheError::InvalidLinkedChunkMetadata {
835 details: format!(
836 "cycle detected in linked chunk at {}",
837 current.identifier.index()
838 ),
839 });
840 }
841
842 let Some(prev_id) = current.previous else {
843 if seen.len() != all_chunks.len() {
845 return Err(EventCacheError::InvalidLinkedChunkMetadata {
846 details: format!(
847 "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
848 seen.len(),
849 all_chunks.len()
850 ),
851 });
852 }
853 break;
854 };
855
856 let Some(pred_meta) = chunk_map.get(&prev_id) else {
859 return Err(EventCacheError::InvalidLinkedChunkMetadata {
860 details: format!(
861 "missing predecessor {} chunk for {}",
862 prev_id.index(),
863 current.identifier.index()
864 ),
865 });
866 };
867
868 if pred_meta.next != Some(current.identifier) {
870 return Err(EventCacheError::InvalidLinkedChunkMetadata {
871 details: format!(
872 "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
873 pred_meta.identifier.index(),
874 pred_meta.next.map(|chunk_id| chunk_id.index()),
875 current.identifier.index()
876 ),
877 });
878 }
879
880 current = *pred_meta;
881 }
882
883 let mut current = current.identifier;
891 for i in 0..all_chunks.len() {
892 let j = all_chunks
894 .iter()
895 .rev()
896 .position(|meta| meta.identifier == current)
897 .map(|j| all_chunks.len() - 1 - j)
898 .expect("the target chunk must be present in the metadata");
899 if i != j {
900 all_chunks.swap(i, j);
901 }
902 if let Some(next) = all_chunks[i].next {
903 current = next;
904 }
905 }
906
907 Ok(Some(all_chunks))
908 }
909
910 pub(in super::super) async fn load_more_events_backwards(
912 &mut self,
913 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
914 if let Some(prev_token) = self.room_linked_chunk.rgap().map(|gap| gap.prev_token) {
917 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
918 }
919
920 let store = self.store.lock().await?;
921
922 let prev_first_chunk =
923 self.room_linked_chunk.chunks().next().expect("a linked chunk is never empty");
924
925 let linked_chunk_id = LinkedChunkId::Room(&self.room);
927 let new_first_chunk = match store
928 .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
929 .await
930 {
931 Ok(Some(new_first_chunk)) => {
932 new_first_chunk
934 }
935
936 Ok(None) => {
937 if self.room_linked_chunk.events().next().is_some() {
942 trace!("chunk is fully loaded and non-empty: reached_start=true");
945 return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
946 }
947
948 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
950 }
951
952 Err(err) => {
953 error!("error when loading the previous chunk of a linked chunk: {err}");
954
955 store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
957
958 return Err(err.into());
960 }
961 };
962
963 let chunk_content = new_first_chunk.content.clone();
964
965 let reached_start = new_first_chunk.previous.is_none();
971
972 if let Err(err) = self.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk) {
973 error!("error when inserting the previous chunk into its linked chunk: {err}");
974
975 store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
977
978 return Err(err.into());
980 }
981
982 let _ = self.room_linked_chunk.store_updates().take();
985
986 let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
988
989 Ok(match chunk_content {
990 ChunkContent::Gap(gap) => {
991 trace!("reloaded chunk from disk (gap)");
992 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
993 }
994
995 ChunkContent::Items(events) => {
996 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
997 LoadMoreEventsBackwardsOutcome::Events {
998 events,
999 timeline_event_diffs,
1000 reached_start,
1001 }
1002 }
1003 })
1004 }
1005
1006 pub(super) async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1015 let store_lock = self.store.lock().await?;
1016
1017 let linked_chunk_id = LinkedChunkId::Room(&self.room);
1019 let (last_chunk, chunk_identifier_generator) =
1020 match store_lock.load_last_chunk(linked_chunk_id).await {
1021 Ok(pair) => pair,
1022
1023 Err(err) => {
1024 error!("error when reloading a linked chunk from memory: {err}");
1026
1027 store_lock
1029 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1030 .await?;
1031
1032 (None, ChunkIdentifierGenerator::new_from_scratch())
1034 }
1035 };
1036
1037 debug!("unloading the linked chunk, and resetting it to its last chunk");
1038
1039 if let Err(err) =
1042 self.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1043 {
1044 error!("error when replacing the linked chunk: {err}");
1045 return self.reset_internal().await;
1046 }
1047
1048 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1052
1053 let _ = self.room_linked_chunk.store_updates().take();
1056
1057 Ok(())
1058 }
1059
1060 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1063 pub(crate) async fn auto_shrink_if_no_subscribers(
1064 &mut self,
1065 ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1066 let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst);
1067
1068 trace!(subscriber_count, "received request to auto-shrink");
1069
1070 if subscriber_count == 0 {
1071 self.shrink_to_last_chunk().await?;
1074 Ok(Some(self.room_linked_chunk.updates_as_vector_diffs()))
1075 } else {
1076 Ok(None)
1077 }
1078 }
1079
1080 #[cfg(test)]
1081 pub(crate) async fn force_shrink_to_last_chunk(
1082 &mut self,
1083 ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1084 self.shrink_to_last_chunk().await?;
1085 Ok(self.room_linked_chunk.updates_as_vector_diffs())
1086 }
1087
1088 pub(crate) fn room_event_order(&self, event_pos: Position) -> Option<usize> {
1089 self.room_linked_chunk.event_order(event_pos)
1090 }
1091
1092 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1096 let mut closure = || -> Option<()> {
1100 let mut val: serde_json::Value = event.deserialize_as().ok()?;
1101 let unsigned = val.get_mut("unsigned")?;
1102 let unsigned_obj = unsigned.as_object_mut()?;
1103 if unsigned_obj.remove("m.relations").is_some() {
1104 *event = Raw::new(&val).ok()?.cast_unchecked();
1105 }
1106 None
1107 };
1108 let _ = closure();
1109 }
1110
1111 fn strip_relations_from_event(ev: &mut Event) {
1112 match &mut ev.kind {
1113 TimelineEventKind::Decrypted(decrypted) => {
1114 decrypted.unsigned_encryption_info = None;
1117
1118 Self::strip_relations_if_present(&mut decrypted.event);
1120 }
1121
1122 TimelineEventKind::UnableToDecrypt { event, .. }
1123 | TimelineEventKind::PlainText { event } => {
1124 Self::strip_relations_if_present(event);
1125 }
1126 }
1127 }
1128
1129 fn strip_relations_from_events(items: &mut [Event]) {
1131 for ev in items.iter_mut() {
1132 Self::strip_relations_from_event(ev);
1133 }
1134 }
1135
1136 #[instrument(skip_all)]
1142 async fn remove_events(
1143 &mut self,
1144 in_memory_events: Vec<(OwnedEventId, Position)>,
1145 in_store_events: Vec<(OwnedEventId, Position)>,
1146 ) -> Result<(), EventCacheError> {
1147 if !in_store_events.is_empty() {
1149 let mut positions = in_store_events
1150 .into_iter()
1151 .map(|(_event_id, position)| position)
1152 .collect::<Vec<_>>();
1153
1154 sort_positions_descending(&mut positions);
1155
1156 let updates = positions
1157 .into_iter()
1158 .map(|pos| Update::RemoveItem { at: pos })
1159 .collect::<Vec<_>>();
1160
1161 self.apply_store_only_updates(updates).await?;
1162 }
1163
1164 if in_memory_events.is_empty() {
1166 return Ok(());
1168 }
1169
1170 self.room_linked_chunk
1172 .remove_events_by_position(
1173 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1174 )
1175 .expect("failed to remove an event");
1176
1177 self.propagate_changes().await
1178 }
1179
1180 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1182 let updates = self.room_linked_chunk.store_updates().take();
1183 self.send_updates_to_store(updates).await
1184 }
1185
1186 async fn apply_store_only_updates(
1193 &mut self,
1194 updates: Vec<Update<Event, Gap>>,
1195 ) -> Result<(), EventCacheError> {
1196 self.room_linked_chunk.order_tracker.map_updates(&updates);
1197 self.send_updates_to_store(updates).await
1198 }
1199
1200 async fn send_updates_to_store(
1201 &mut self,
1202 mut updates: Vec<Update<Event, Gap>>,
1203 ) -> Result<(), EventCacheError> {
1204 if updates.is_empty() {
1205 return Ok(());
1206 }
1207
1208 for update in updates.iter_mut() {
1210 match update {
1211 Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1212 Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1213 Update::NewItemsChunk { .. }
1215 | Update::NewGapChunk { .. }
1216 | Update::RemoveChunk(_)
1217 | Update::RemoveItem { .. }
1218 | Update::DetachLastItems { .. }
1219 | Update::StartReattachItems
1220 | Update::EndReattachItems
1221 | Update::Clear => {}
1222 }
1223 }
1224
1225 let store = self.store.clone();
1232 let room_id = self.room.clone();
1233 let cloned_updates = updates.clone();
1234
1235 spawn(async move {
1236 let store = store.lock().await?;
1237
1238 trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1239 let linked_chunk_id = LinkedChunkId::Room(&room_id);
1240 store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1241 trace!("linked chunk updates applied");
1242
1243 super::Result::Ok(())
1244 })
1245 .await
1246 .expect("joining failed")?;
1247
1248 let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1250 linked_chunk_id: OwnedLinkedChunkId::Room(self.room.clone()),
1251 updates,
1252 });
1253
1254 Ok(())
1255 }
1256
1257 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1263 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1264 self.reset_internal().await?;
1265
1266 let diff_updates = self.room_linked_chunk.updates_as_vector_diffs();
1267
1268 debug_assert_eq!(diff_updates.len(), 1);
1270 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1271
1272 Ok(diff_updates)
1273 }
1274
1275 async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1276 self.room_linked_chunk.reset();
1277
1278 for thread in self.threads.values_mut() {
1284 thread.clear();
1285 }
1286
1287 self.propagate_changes().await?;
1288
1289 self.waited_for_initial_prev_token = false;
1293 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1295
1296 Ok(())
1297 }
1298
1299 pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1301 &self.room_linked_chunk
1302 }
1303
1304 pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1310 where
1311 P: FnMut(&Event) -> Option<O>,
1312 {
1313 self.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event))
1314 }
1315
1316 pub async fn find_event(
1321 &self,
1322 event_id: &EventId,
1323 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1324 for (position, event) in self.room_linked_chunk.revents() {
1327 if event.event_id().as_deref() == Some(event_id) {
1328 return Ok(Some((EventLocation::Memory(position), event.clone())));
1329 }
1330 }
1331
1332 let store = self.store.lock().await?;
1333
1334 Ok(store
1335 .find_event(&self.room, event_id)
1336 .await?
1337 .map(|event| (EventLocation::Store, event)))
1338 }
1339
1340 pub async fn find_event_with_relations(
1354 &self,
1355 event_id: &EventId,
1356 filters: Option<Vec<RelationType>>,
1357 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1358 let store = self.store.lock().await?;
1359
1360 let found = store.find_event(&self.room, event_id).await?;
1362
1363 let Some(target) = found else {
1364 return Ok(None);
1366 };
1367
1368 let mut related =
1371 store.find_event_relations(&self.room, event_id, filters.as_deref()).await?;
1372 let mut stack =
1373 related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
1374
1375 let mut already_seen = HashSet::new();
1378 already_seen.insert(event_id.to_owned());
1379
1380 let mut num_iters = 1;
1381
1382 while let Some(event_id) = stack.pop() {
1384 if !already_seen.insert(event_id.clone()) {
1385 continue;
1387 }
1388
1389 let other_related =
1390 store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?;
1391
1392 stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
1393 related.extend(other_related);
1394
1395 num_iters += 1;
1396 }
1397
1398 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
1399
1400 related.sort_by(|(_, lhs), (_, rhs)| {
1404 use std::cmp::Ordering;
1405 match (lhs, rhs) {
1406 (None, None) => Ordering::Equal,
1407 (None, Some(_)) => Ordering::Less,
1408 (Some(_), None) => Ordering::Greater,
1409 (Some(lhs), Some(rhs)) => {
1410 let lhs = self.room_event_order(*lhs);
1411 let rhs = self.room_event_order(*rhs);
1412
1413 match (lhs, rhs) {
1417 (None, None) => Ordering::Equal,
1418 (None, Some(_)) => Ordering::Less,
1419 (Some(_), None) => Ordering::Greater,
1420 (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
1421 }
1422 }
1423 }
1424 });
1425
1426 let related = related.into_iter().map(|(event, _pos)| event).collect();
1428
1429 Ok(Some((target, related)))
1430 }
1431
1432 async fn post_process_new_events(
1437 &mut self,
1438 events: Vec<Event>,
1439 is_sync: bool,
1440 ) -> Result<(), EventCacheError> {
1441 self.propagate_changes().await?;
1443
1444 let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1445
1446 for event in events {
1447 self.maybe_apply_new_redaction(&event).await?;
1448
1449 if self.enabled_thread_support {
1450 if is_sync {
1455 if let Some(thread_root) = extract_thread_root(event.raw()) {
1456 new_events_by_thread
1457 .entry(thread_root)
1458 .or_default()
1459 .push(event.clone());
1460 } else if let Some(event_id) = event.event_id() {
1461 if self.threads.contains_key(&event_id) {
1463 new_events_by_thread
1464 .entry(event_id)
1465 .or_default()
1466 .push(event.clone());
1467 }
1468 }
1469 }
1470
1471 if let Some(edit_target) = extract_edit_target(event.raw()) {
1473 if let Some((_location, edit_target_event)) =
1475 self.find_event(&edit_target).await?
1476 && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1477 {
1478 new_events_by_thread.entry(thread_root).or_default();
1481 }
1482 }
1483 }
1484
1485 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1487 self.save_events([*bundled_thread]).await?;
1488 }
1489 }
1490
1491 if self.enabled_thread_support {
1492 self.update_threads(new_events_by_thread).await?;
1493 }
1494
1495 Ok(())
1496 }
1497
1498 fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1499 self.threads.entry(root_event_id.clone()).or_insert_with(|| {
1502 ThreadEventCache::new(
1503 self.room.clone(),
1504 root_event_id,
1505 self.linked_chunk_update_sender.clone(),
1506 )
1507 })
1508 }
1509
1510 #[instrument(skip_all)]
1511 async fn update_threads(
1512 &mut self,
1513 new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1514 ) -> Result<(), EventCacheError> {
1515 for (thread_root, new_events) in new_events_by_thread {
1516 let thread_cache = self.get_or_reload_thread(thread_root.clone());
1517
1518 thread_cache.add_live_events(new_events);
1519
1520 let mut latest_event_id = thread_cache.latest_event_id();
1521
1522 if let Some(event_id) = latest_event_id.as_ref()
1525 && let Some((_, edits)) = self
1526 .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1527 .await?
1528 && let Some(latest_edit) = edits.last()
1529 {
1530 latest_event_id = latest_edit.event_id();
1531 }
1532
1533 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1534 }
1535
1536 Ok(())
1537 }
1538
1539 async fn maybe_update_thread_summary(
1541 &mut self,
1542 thread_root: OwnedEventId,
1543 latest_event_id: Option<OwnedEventId>,
1544 ) -> Result<(), EventCacheError> {
1545 let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1549 trace!(%thread_root, "thread root event is missing from the room linked chunk");
1550 return Ok(());
1551 };
1552
1553 let prev_summary = target_event.thread_summary.summary();
1554
1555 let num_replies = {
1564 let store_guard = &*self.store.lock().await?;
1565 let thread_replies = store_guard
1566 .find_event_relations(&self.room, &thread_root, Some(&[RelationType::Thread]))
1567 .await?;
1568 thread_replies.len().try_into().unwrap_or(u32::MAX)
1569 };
1570
1571 let new_summary = if num_replies > 0 {
1572 Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1573 } else {
1574 None
1575 };
1576
1577 if prev_summary == new_summary.as_ref() {
1578 trace!(%thread_root, "thread summary is already up-to-date");
1579 return Ok(());
1580 }
1581
1582 trace!(%thread_root, "updating thread summary: {new_summary:?}");
1584 target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
1585 self.replace_event_at(location, target_event).await
1586 }
1587
1588 async fn replace_event_at(
1595 &mut self,
1596 location: EventLocation,
1597 event: Event,
1598 ) -> Result<(), EventCacheError> {
1599 match location {
1600 EventLocation::Memory(position) => {
1601 self.room_linked_chunk
1602 .replace_event_at(position, event)
1603 .expect("should have been a valid position of an item");
1604 self.propagate_changes().await?;
1607 }
1608 EventLocation::Store => {
1609 self.save_events([event]).await?;
1610 }
1611 }
1612
1613 Ok(())
1614 }
1615
1616 #[instrument(skip_all)]
1620 async fn maybe_apply_new_redaction(
1621 &mut self,
1622 event: &Event,
1623 ) -> Result<(), EventCacheError> {
1624 let raw_event = event.raw();
1625
1626 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1629 raw_event.get_field::<MessageLikeEventType>("type")
1630 else {
1631 return Ok(());
1632 };
1633
1634 let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1637 redaction,
1638 ))) = raw_event.deserialize()
1639 else {
1640 return Ok(());
1641 };
1642
1643 let Some(event_id) = redaction.redacts(&self.room_version_rules.redaction) else {
1644 warn!("missing target event id from the redaction event");
1645 return Ok(());
1646 };
1647
1648 let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1650 trace!("redacted event is missing from the linked chunk");
1651 return Ok(());
1652 };
1653
1654 let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
1656 match deserialized {
1659 AnySyncTimelineEvent::MessageLike(ev) => {
1660 if ev.is_redacted() {
1661 return Ok(());
1662 }
1663 }
1664 AnySyncTimelineEvent::State(ev) => {
1665 if ev.is_redacted() {
1666 return Ok(());
1667 }
1668 }
1669 }
1670
1671 extract_thread_root(target_event.raw())
1674 } else {
1675 warn!("failed to deserialize the event to redact");
1676 None
1677 };
1678
1679 if let Some(redacted_event) = apply_redaction(
1680 target_event.raw(),
1681 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
1682 &self.room_version_rules.redaction,
1683 ) {
1684 target_event.replace_raw(redacted_event.cast_unchecked());
1689
1690 self.replace_event_at(location, target_event).await?;
1691
1692 if let Some(thread_root) = thread_root
1700 && let Some(thread_cache) = self.threads.get_mut(&thread_root)
1701 {
1702 thread_cache.remove_if_present(event_id);
1703
1704 let latest_event_id = thread_cache.latest_event_id();
1707 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1708 }
1709 }
1710
1711 Ok(())
1712 }
1713
1714 pub async fn save_events(
1716 &self,
1717 events: impl IntoIterator<Item = Event>,
1718 ) -> Result<(), EventCacheError> {
1719 let store = self.store.clone();
1720 let room_id = self.room.clone();
1721 let events = events.into_iter().collect::<Vec<_>>();
1722
1723 spawn(async move {
1725 let store = store.lock().await?;
1726 for event in events {
1727 store.save_event(&room_id, event).await?;
1728 }
1729 super::Result::Ok(())
1730 })
1731 .await
1732 .expect("joining failed")?;
1733
1734 Ok(())
1735 }
1736
1737 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1745 pub async fn handle_sync(
1746 &mut self,
1747 mut timeline: Timeline,
1748 ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1749 let mut prev_batch = timeline.prev_batch.take();
1750
1751 let DeduplicationOutcome {
1752 all_events: events,
1753 in_memory_duplicated_event_ids,
1754 in_store_duplicated_event_ids,
1755 non_empty_all_duplicates: all_duplicates,
1756 } = filter_duplicate_events(
1757 &self.store,
1758 LinkedChunkId::Room(self.room.as_ref()),
1759 &self.room_linked_chunk,
1760 timeline.events,
1761 )
1762 .await?;
1763
1764 if !timeline.limited && self.room_linked_chunk.events().next().is_some()
1777 || all_duplicates
1778 {
1779 prev_batch = None;
1780 }
1781
1782 if prev_batch.is_some() {
1783 let mut summaries_to_update = Vec::new();
1788
1789 for (thread_root, thread) in self.threads.iter_mut() {
1790 thread.clear();
1792
1793 summaries_to_update.push(thread_root.clone());
1794 }
1795
1796 for thread_root in summaries_to_update {
1800 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1801 else {
1802 trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1803 continue;
1804 };
1805
1806 if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1807 prev_summary.latest_reply = None;
1808
1809 target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1810
1811 self.replace_event_at(location, target_event).await?;
1812 }
1813 }
1814 }
1815
1816 if all_duplicates {
1817 return Ok((false, Vec::new()));
1820 }
1821
1822 let has_new_gap = prev_batch.is_some();
1823
1824 if !self.waited_for_initial_prev_token && has_new_gap {
1827 self.waited_for_initial_prev_token = true;
1828 }
1829
1830 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1835 .await?;
1836
1837 self.room_linked_chunk
1838 .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1839
1840 self.post_process_new_events(events, true).await?;
1841
1842 if timeline.limited && has_new_gap {
1843 self.shrink_to_last_chunk().await?;
1850 }
1851
1852 let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1853
1854 Ok((has_new_gap, timeline_event_diffs))
1855 }
1856
1857 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1865 pub async fn handle_backpagination(
1866 &mut self,
1867 events: Vec<Event>,
1868 mut new_token: Option<String>,
1869 prev_token: Option<String>,
1870 ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1871 {
1872 let prev_gap_id = if let Some(token) = prev_token {
1875 let gap_chunk_id = self.room_linked_chunk.chunk_identifier(|chunk| {
1877 matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1878 });
1879
1880 if gap_chunk_id.is_none() {
1881 return Ok(None);
1887 }
1888
1889 gap_chunk_id
1890 } else {
1891 None
1892 };
1893
1894 let DeduplicationOutcome {
1895 all_events: mut events,
1896 in_memory_duplicated_event_ids,
1897 in_store_duplicated_event_ids,
1898 non_empty_all_duplicates: all_duplicates,
1899 } = filter_duplicate_events(
1900 &self.store,
1901 LinkedChunkId::Room(self.room.as_ref()),
1902 &self.room_linked_chunk,
1903 events,
1904 )
1905 .await?;
1906
1907 if !all_duplicates {
1921 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1923 .await?;
1924 } else {
1925 events.clear();
1927 new_token = None;
1930 }
1931
1932 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1935
1936 let new_gap = new_token.map(|prev_token| Gap { prev_token });
1937 let reached_start = self.room_linked_chunk.finish_back_pagination(
1938 prev_gap_id,
1939 new_gap,
1940 &topo_ordered_events,
1941 );
1942
1943 self.post_process_new_events(topo_ordered_events, false).await?;
1945
1946 let event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1947
1948 Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1949 }
1950
1951 pub fn subscribe_to_thread(
1954 &mut self,
1955 root: OwnedEventId,
1956 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1957 self.get_or_reload_thread(root).subscribe()
1958 }
1959
1960 pub fn finish_thread_network_pagination(
1964 &mut self,
1965 root: OwnedEventId,
1966 prev_token: Option<String>,
1967 new_token: Option<String>,
1968 events: Vec<Event>,
1969 ) -> Option<BackPaginationOutcome> {
1970 self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1971 }
1972
1973 pub fn load_more_thread_events_backwards(
1974 &mut self,
1975 root: OwnedEventId,
1976 ) -> LoadMoreEventsBackwardsOutcome {
1977 self.get_or_reload_thread(root).load_more_events_backwards()
1978 }
1979 }
1980}
1981
1982pub(super) enum EventLocation {
1984 Memory(Position),
1986
1987 Store,
1989}
1990
1991pub(super) use private::RoomEventCacheState;
1992
1993#[cfg(test)]
1994mod tests {
1995 use matrix_sdk_base::event_cache::Event;
1996 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1997 use ruma::{
1998 RoomId, event_id,
1999 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2000 room_id, user_id,
2001 };
2002
2003 use crate::test_utils::logged_in_client;
2004
2005 #[async_test]
2006 async fn test_find_event_by_id_with_edit_relation() {
2007 let original_id = event_id!("$original");
2008 let related_id = event_id!("$related");
2009 let room_id = room_id!("!galette:saucisse.bzh");
2010 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2011
2012 assert_relations(
2013 room_id,
2014 f.text_msg("Original event").event_id(original_id).into(),
2015 f.text_msg("* An edited event")
2016 .edit(
2017 original_id,
2018 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2019 )
2020 .event_id(related_id)
2021 .into(),
2022 f,
2023 )
2024 .await;
2025 }
2026
2027 #[async_test]
2028 async fn test_find_event_by_id_with_thread_reply_relation() {
2029 let original_id = event_id!("$original");
2030 let related_id = event_id!("$related");
2031 let room_id = room_id!("!galette:saucisse.bzh");
2032 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2033
2034 assert_relations(
2035 room_id,
2036 f.text_msg("Original event").event_id(original_id).into(),
2037 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2038 f,
2039 )
2040 .await;
2041 }
2042
2043 #[async_test]
2044 async fn test_find_event_by_id_with_reaction_relation() {
2045 let original_id = event_id!("$original");
2046 let related_id = event_id!("$related");
2047 let room_id = room_id!("!galette:saucisse.bzh");
2048 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2049
2050 assert_relations(
2051 room_id,
2052 f.text_msg("Original event").event_id(original_id).into(),
2053 f.reaction(original_id, ":D").event_id(related_id).into(),
2054 f,
2055 )
2056 .await;
2057 }
2058
2059 #[async_test]
2060 async fn test_find_event_by_id_with_poll_response_relation() {
2061 let original_id = event_id!("$original");
2062 let related_id = event_id!("$related");
2063 let room_id = room_id!("!galette:saucisse.bzh");
2064 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2065
2066 assert_relations(
2067 room_id,
2068 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2069 .event_id(original_id)
2070 .into(),
2071 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2072 f,
2073 )
2074 .await;
2075 }
2076
2077 #[async_test]
2078 async fn test_find_event_by_id_with_poll_end_relation() {
2079 let original_id = event_id!("$original");
2080 let related_id = event_id!("$related");
2081 let room_id = room_id!("!galette:saucisse.bzh");
2082 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2083
2084 assert_relations(
2085 room_id,
2086 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2087 .event_id(original_id)
2088 .into(),
2089 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2090 f,
2091 )
2092 .await;
2093 }
2094
2095 #[async_test]
2096 async fn test_find_event_by_id_with_filtered_relationships() {
2097 let original_id = event_id!("$original");
2098 let related_id = event_id!("$related");
2099 let associated_related_id = event_id!("$recursive_related");
2100 let room_id = room_id!("!galette:saucisse.bzh");
2101 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2102
2103 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2104 let related_event = event_factory
2105 .text_msg("* Edited event")
2106 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2107 .event_id(related_id)
2108 .into();
2109 let associated_related_event =
2110 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2111
2112 let client = logged_in_client(None).await;
2113
2114 let event_cache = client.event_cache();
2115 event_cache.subscribe().unwrap();
2116
2117 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2118 let room = client.get_room(room_id).unwrap();
2119
2120 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2121
2122 room_event_cache.save_events([original_event]).await;
2124
2125 room_event_cache.save_events([related_event]).await;
2127
2128 room_event_cache.save_events([associated_related_event]).await;
2130
2131 let filter = Some(vec![RelationType::Replacement]);
2132 let (event, related_events) =
2133 room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2134 let cached_event_id = event.event_id().unwrap();
2136 assert_eq!(cached_event_id, original_id);
2137
2138 assert_eq!(related_events.len(), 1);
2140
2141 let related_event_id = related_events[0].event_id().unwrap();
2142 assert_eq!(related_event_id, related_id);
2143
2144 let filter = Some(vec![RelationType::Thread]);
2146 let (event, related_events) =
2147 room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2148 let cached_event_id = event.event_id().unwrap();
2150 assert_eq!(cached_event_id, original_id);
2151 assert!(related_events.is_empty());
2153 }
2154
2155 #[async_test]
2156 async fn test_find_event_by_id_with_recursive_relation() {
2157 let original_id = event_id!("$original");
2158 let related_id = event_id!("$related");
2159 let associated_related_id = event_id!("$recursive_related");
2160 let room_id = room_id!("!galette:saucisse.bzh");
2161 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2162
2163 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2164 let related_event = event_factory
2165 .text_msg("* Edited event")
2166 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2167 .event_id(related_id)
2168 .into();
2169 let associated_related_event =
2170 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2171
2172 let client = logged_in_client(None).await;
2173
2174 let event_cache = client.event_cache();
2175 event_cache.subscribe().unwrap();
2176
2177 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2178 let room = client.get_room(room_id).unwrap();
2179
2180 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2181
2182 room_event_cache.save_events([original_event]).await;
2184
2185 room_event_cache.save_events([related_event]).await;
2187
2188 room_event_cache.save_events([associated_related_event]).await;
2190
2191 let (event, related_events) =
2192 room_event_cache.find_event_with_relations(original_id, None).await.unwrap();
2193 let cached_event_id = event.event_id().unwrap();
2195 assert_eq!(cached_event_id, original_id);
2196
2197 assert_eq!(related_events.len(), 2);
2199
2200 let related_event_id = related_events[0].event_id().unwrap();
2201 assert_eq!(related_event_id, related_id);
2202 let related_event_id = related_events[1].event_id().unwrap();
2203 assert_eq!(related_event_id, associated_related_id);
2204 }
2205
2206 async fn assert_relations(
2207 room_id: &RoomId,
2208 original_event: Event,
2209 related_event: Event,
2210 event_factory: EventFactory,
2211 ) {
2212 let client = logged_in_client(None).await;
2213
2214 let event_cache = client.event_cache();
2215 event_cache.subscribe().unwrap();
2216
2217 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2218 let room = client.get_room(room_id).unwrap();
2219
2220 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2221
2222 let original_event_id = original_event.event_id().unwrap();
2224 room_event_cache.save_events([original_event]).await;
2225
2226 let unrelated_id = event_id!("$2");
2228 room_event_cache
2229 .save_events([event_factory
2230 .text_msg("An unrelated event")
2231 .event_id(unrelated_id)
2232 .into()])
2233 .await;
2234
2235 let related_id = related_event.event_id().unwrap();
2237 room_event_cache.save_events([related_event]).await;
2238
2239 let (event, related_events) =
2240 room_event_cache.find_event_with_relations(&original_event_id, None).await.unwrap();
2241 let cached_event_id = event.event_id().unwrap();
2243 assert_eq!(cached_event_id, original_event_id);
2244
2245 let related_event_id = related_events[0].event_id().unwrap();
2247 assert_eq!(related_event_id, related_id);
2248 }
2249}
2250
2251#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
2253 use std::sync::Arc;
2254
2255 use assert_matches::assert_matches;
2256 use assert_matches2::assert_let;
2257 use eyeball_im::VectorDiff;
2258 use futures_util::FutureExt;
2259 use matrix_sdk_base::{
2260 event_cache::{
2261 Gap,
2262 store::{EventCacheStore as _, MemoryStore},
2263 },
2264 linked_chunk::{
2265 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2266 lazy_loader::from_all_chunks,
2267 },
2268 store::StoreConfig,
2269 sync::{JoinedRoomUpdate, Timeline},
2270 };
2271 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2272 use ruma::{
2273 OwnedUserId, event_id,
2274 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2275 room_id, user_id,
2276 };
2277 use tokio::task::yield_now;
2278
2279 use super::RoomEventCacheGenericUpdate;
2280 use crate::{
2281 assert_let_timeout,
2282 event_cache::{RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2283 test_utils::client::MockClientBuilder,
2284 };
2285
2286 #[async_test]
2287 async fn test_write_to_storage() {
2288 let room_id = room_id!("!galette:saucisse.bzh");
2289 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2290
2291 let event_cache_store = Arc::new(MemoryStore::new());
2292
2293 let client = MockClientBuilder::new(None)
2294 .on_builder(|builder| {
2295 builder.store_config(
2296 StoreConfig::new("hodlor".to_owned())
2297 .event_cache_store(event_cache_store.clone()),
2298 )
2299 })
2300 .build()
2301 .await;
2302
2303 let event_cache = client.event_cache();
2304
2305 event_cache.subscribe().unwrap();
2307
2308 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2309 let room = client.get_room(room_id).unwrap();
2310
2311 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2312 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2313
2314 let timeline = Timeline {
2316 limited: true,
2317 prev_batch: Some("raclette".to_owned()),
2318 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2319 };
2320
2321 room_event_cache
2322 .inner
2323 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2324 .await
2325 .unwrap();
2326
2327 assert_matches!(
2329 generic_stream.recv().await,
2330 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2331 assert_eq!(expected_room_id, room_id);
2332 }
2333 );
2334
2335 let linked_chunk = from_all_chunks::<3, _, _>(
2337 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2338 )
2339 .unwrap()
2340 .unwrap();
2341
2342 assert_eq!(linked_chunk.chunks().count(), 2);
2343
2344 let mut chunks = linked_chunk.chunks();
2345
2346 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2348 assert_eq!(gap.prev_token, "raclette");
2349 });
2350
2351 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2353 assert_eq!(events.len(), 1);
2354 let deserialized = events[0].raw().deserialize().unwrap();
2355 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2356 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2357 });
2358
2359 assert!(chunks.next().is_none());
2361 }
2362
2363 #[async_test]
2364 async fn test_write_to_storage_strips_bundled_relations() {
2365 let room_id = room_id!("!galette:saucisse.bzh");
2366 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2367
2368 let event_cache_store = Arc::new(MemoryStore::new());
2369
2370 let client = MockClientBuilder::new(None)
2371 .on_builder(|builder| {
2372 builder.store_config(
2373 StoreConfig::new("hodlor".to_owned())
2374 .event_cache_store(event_cache_store.clone()),
2375 )
2376 })
2377 .build()
2378 .await;
2379
2380 let event_cache = client.event_cache();
2381
2382 event_cache.subscribe().unwrap();
2384
2385 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2386 let room = client.get_room(room_id).unwrap();
2387
2388 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2389 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2390
2391 let ev = f
2393 .text_msg("hey yo")
2394 .sender(*ALICE)
2395 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2396 .into_event();
2397
2398 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2399
2400 room_event_cache
2401 .inner
2402 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2403 .await
2404 .unwrap();
2405
2406 assert_matches!(
2408 generic_stream.recv().await,
2409 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2410 assert_eq!(expected_room_id, room_id);
2411 }
2412 );
2413
2414 {
2416 let events = room_event_cache.events().await;
2417
2418 assert_eq!(events.len(), 1);
2419
2420 let ev = events[0].raw().deserialize().unwrap();
2421 assert_let!(
2422 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2423 );
2424
2425 let original = msg.as_original().unwrap();
2426 assert_eq!(original.content.body(), "hey yo");
2427 assert!(original.unsigned.relations.replace.is_some());
2428 }
2429
2430 let linked_chunk = from_all_chunks::<3, _, _>(
2432 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2433 )
2434 .unwrap()
2435 .unwrap();
2436
2437 assert_eq!(linked_chunk.chunks().count(), 1);
2438
2439 let mut chunks = linked_chunk.chunks();
2440 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2441 assert_eq!(events.len(), 1);
2442
2443 let ev = events[0].raw().deserialize().unwrap();
2444 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2445
2446 let original = msg.as_original().unwrap();
2447 assert_eq!(original.content.body(), "hey yo");
2448 assert!(original.unsigned.relations.replace.is_none());
2449 });
2450
2451 assert!(chunks.next().is_none());
2453 }
2454
2455 #[async_test]
2456 async fn test_clear() {
2457 let room_id = room_id!("!galette:saucisse.bzh");
2458 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2459
2460 let event_cache_store = Arc::new(MemoryStore::new());
2461
2462 let event_id1 = event_id!("$1");
2463 let event_id2 = event_id!("$2");
2464
2465 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2466 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2467
2468 event_cache_store
2470 .handle_linked_chunk_updates(
2471 LinkedChunkId::Room(room_id),
2472 vec![
2473 Update::NewItemsChunk {
2475 previous: None,
2476 new: ChunkIdentifier::new(0),
2477 next: None,
2478 },
2479 Update::NewGapChunk {
2481 previous: Some(ChunkIdentifier::new(0)),
2482 new: ChunkIdentifier::new(42),
2484 next: None,
2485 gap: Gap { prev_token: "comté".to_owned() },
2486 },
2487 Update::NewItemsChunk {
2489 previous: Some(ChunkIdentifier::new(42)),
2490 new: ChunkIdentifier::new(1),
2491 next: None,
2492 },
2493 Update::PushItems {
2494 at: Position::new(ChunkIdentifier::new(1), 0),
2495 items: vec![ev1.clone()],
2496 },
2497 Update::NewItemsChunk {
2499 previous: Some(ChunkIdentifier::new(1)),
2500 new: ChunkIdentifier::new(2),
2501 next: None,
2502 },
2503 Update::PushItems {
2504 at: Position::new(ChunkIdentifier::new(2), 0),
2505 items: vec![ev2.clone()],
2506 },
2507 ],
2508 )
2509 .await
2510 .unwrap();
2511
2512 let client = MockClientBuilder::new(None)
2513 .on_builder(|builder| {
2514 builder.store_config(
2515 StoreConfig::new("hodlor".to_owned())
2516 .event_cache_store(event_cache_store.clone()),
2517 )
2518 })
2519 .build()
2520 .await;
2521
2522 let event_cache = client.event_cache();
2523
2524 event_cache.subscribe().unwrap();
2526
2527 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2528 let room = client.get_room(room_id).unwrap();
2529
2530 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2531
2532 let (items, mut stream) = room_event_cache.subscribe().await;
2533 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2534
2535 {
2537 assert!(room_event_cache.find_event(event_id1).await.is_some());
2538 assert!(room_event_cache.find_event(event_id2).await.is_some());
2539 }
2540
2541 {
2543 assert_eq!(items.len(), 1);
2545 assert_eq!(items[0].event_id().unwrap(), event_id2);
2546
2547 assert!(stream.is_empty());
2548 }
2549
2550 {
2552 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2553
2554 assert_let_timeout!(
2555 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2556 );
2557 assert_eq!(diffs.len(), 1);
2558 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2559 assert_eq!(event.event_id().unwrap(), event_id1);
2561 });
2562
2563 assert!(stream.is_empty());
2564 }
2565
2566 room_event_cache.clear().await.unwrap();
2568
2569 assert_let_timeout!(
2571 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2572 );
2573 assert_eq!(diffs.len(), 1);
2574 assert_let!(VectorDiff::Clear = &diffs[0]);
2575
2576 assert_let_timeout!(
2578 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2579 );
2580 assert_eq!(received_room_id, room_id);
2581
2582 assert!(room_event_cache.find_event(event_id1).await.is_some());
2585
2586 let items = room_event_cache.events().await;
2588 assert!(items.is_empty());
2589
2590 let linked_chunk = from_all_chunks::<3, _, _>(
2592 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2593 )
2594 .unwrap()
2595 .unwrap();
2596
2597 assert_eq!(linked_chunk.num_items(), 0);
2601 }
2602
2603 #[async_test]
2604 async fn test_load_from_storage() {
2605 let room_id = room_id!("!galette:saucisse.bzh");
2606 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2607
2608 let event_cache_store = Arc::new(MemoryStore::new());
2609
2610 let event_id1 = event_id!("$1");
2611 let event_id2 = event_id!("$2");
2612
2613 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2614 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2615
2616 event_cache_store
2618 .handle_linked_chunk_updates(
2619 LinkedChunkId::Room(room_id),
2620 vec![
2621 Update::NewItemsChunk {
2623 previous: None,
2624 new: ChunkIdentifier::new(0),
2625 next: None,
2626 },
2627 Update::NewGapChunk {
2629 previous: Some(ChunkIdentifier::new(0)),
2630 new: ChunkIdentifier::new(42),
2632 next: None,
2633 gap: Gap { prev_token: "cheddar".to_owned() },
2634 },
2635 Update::NewItemsChunk {
2637 previous: Some(ChunkIdentifier::new(42)),
2638 new: ChunkIdentifier::new(1),
2639 next: None,
2640 },
2641 Update::PushItems {
2642 at: Position::new(ChunkIdentifier::new(1), 0),
2643 items: vec![ev1.clone()],
2644 },
2645 Update::NewItemsChunk {
2647 previous: Some(ChunkIdentifier::new(1)),
2648 new: ChunkIdentifier::new(2),
2649 next: None,
2650 },
2651 Update::PushItems {
2652 at: Position::new(ChunkIdentifier::new(2), 0),
2653 items: vec![ev2.clone()],
2654 },
2655 ],
2656 )
2657 .await
2658 .unwrap();
2659
2660 let client = MockClientBuilder::new(None)
2661 .on_builder(|builder| {
2662 builder.store_config(
2663 StoreConfig::new("hodlor".to_owned())
2664 .event_cache_store(event_cache_store.clone()),
2665 )
2666 })
2667 .build()
2668 .await;
2669
2670 let event_cache = client.event_cache();
2671
2672 event_cache.subscribe().unwrap();
2674
2675 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2677
2678 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2679 let room = client.get_room(room_id).unwrap();
2680
2681 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2682
2683 assert_matches!(
2686 generic_stream.recv().await,
2687 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2688 assert_eq!(room_id, expected_room_id);
2689 }
2690 );
2691
2692 let (items, mut stream) = room_event_cache.subscribe().await;
2693
2694 assert_eq!(items.len(), 1);
2697 assert_eq!(items[0].event_id().unwrap(), event_id2);
2698 assert!(stream.is_empty());
2699
2700 assert!(room_event_cache.find_event(event_id1).await.is_some());
2702 assert!(room_event_cache.find_event(event_id2).await.is_some());
2703
2704 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2706
2707 assert_let_timeout!(
2708 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2709 );
2710 assert_eq!(diffs.len(), 1);
2711 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2712 assert_eq!(event.event_id().unwrap(), event_id1);
2713 });
2714
2715 assert!(stream.is_empty());
2716
2717 assert_matches!(
2719 generic_stream.recv().await,
2720 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2721 assert_eq!(expected_room_id, room_id);
2722 }
2723 );
2724
2725 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2727
2728 room_event_cache
2729 .inner
2730 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2731 .await
2732 .unwrap();
2733
2734 assert!(generic_stream.recv().now_or_never().is_none());
2737
2738 let items = room_event_cache.events().await;
2743 assert_eq!(items.len(), 2);
2744 assert_eq!(items[0].event_id().unwrap(), event_id1);
2745 assert_eq!(items[1].event_id().unwrap(), event_id2);
2746 }
2747
2748 #[async_test]
2749 async fn test_load_from_storage_resilient_to_failure() {
2750 let room_id = room_id!("!fondue:patate.ch");
2751 let event_cache_store = Arc::new(MemoryStore::new());
2752
2753 let event = EventFactory::new()
2754 .room(room_id)
2755 .sender(user_id!("@ben:saucisse.bzh"))
2756 .text_msg("foo")
2757 .event_id(event_id!("$42"))
2758 .into_event();
2759
2760 event_cache_store
2762 .handle_linked_chunk_updates(
2763 LinkedChunkId::Room(room_id),
2764 vec![
2765 Update::NewItemsChunk {
2766 previous: None,
2767 new: ChunkIdentifier::new(0),
2768 next: None,
2769 },
2770 Update::PushItems {
2771 at: Position::new(ChunkIdentifier::new(0), 0),
2772 items: vec![event],
2773 },
2774 Update::NewItemsChunk {
2775 previous: Some(ChunkIdentifier::new(0)),
2776 new: ChunkIdentifier::new(1),
2777 next: Some(ChunkIdentifier::new(0)),
2778 },
2779 ],
2780 )
2781 .await
2782 .unwrap();
2783
2784 let client = MockClientBuilder::new(None)
2785 .on_builder(|builder| {
2786 builder.store_config(
2787 StoreConfig::new("holder".to_owned())
2788 .event_cache_store(event_cache_store.clone()),
2789 )
2790 })
2791 .build()
2792 .await;
2793
2794 let event_cache = client.event_cache();
2795
2796 event_cache.subscribe().unwrap();
2798
2799 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2800 let room = client.get_room(room_id).unwrap();
2801
2802 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2803
2804 let items = room_event_cache.events().await;
2805
2806 assert!(items.is_empty());
2809
2810 let raw_chunks =
2813 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
2814 assert!(raw_chunks.is_empty());
2815 }
2816
2817 #[async_test]
2818 async fn test_no_useless_gaps() {
2819 let room_id = room_id!("!galette:saucisse.bzh");
2820
2821 let client = MockClientBuilder::new(None).build().await;
2822
2823 let event_cache = client.event_cache();
2824 event_cache.subscribe().unwrap();
2825
2826 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2827 let room = client.get_room(room_id).unwrap();
2828 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2829 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2830
2831 let f = EventFactory::new().room(room_id).sender(*ALICE);
2832
2833 room_event_cache
2836 .inner
2837 .handle_joined_room_update(JoinedRoomUpdate {
2838 timeline: Timeline {
2839 limited: true,
2840 prev_batch: Some("raclette".to_owned()),
2841 events: vec![f.text_msg("hey yo").into_event()],
2842 },
2843 ..Default::default()
2844 })
2845 .await
2846 .unwrap();
2847
2848 assert_matches!(
2850 generic_stream.recv().await,
2851 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2852 assert_eq!(expected_room_id, room_id);
2853 }
2854 );
2855
2856 {
2857 let mut state = room_event_cache.inner.state.write().await;
2858
2859 let mut num_gaps = 0;
2860 let mut num_events = 0;
2861
2862 for c in state.room_linked_chunk().chunks() {
2863 match c.content() {
2864 ChunkContent::Items(items) => num_events += items.len(),
2865 ChunkContent::Gap(_) => num_gaps += 1,
2866 }
2867 }
2868
2869 assert_eq!(num_gaps, 0);
2872 assert_eq!(num_events, 1);
2873
2874 assert_matches!(
2876 state.load_more_events_backwards().await.unwrap(),
2877 LoadMoreEventsBackwardsOutcome::Gap { .. }
2878 );
2879
2880 num_gaps = 0;
2881 num_events = 0;
2882 for c in state.room_linked_chunk().chunks() {
2883 match c.content() {
2884 ChunkContent::Items(items) => num_events += items.len(),
2885 ChunkContent::Gap(_) => num_gaps += 1,
2886 }
2887 }
2888
2889 assert_eq!(num_gaps, 1);
2891 assert_eq!(num_events, 1);
2892 }
2893
2894 room_event_cache
2897 .inner
2898 .handle_joined_room_update(JoinedRoomUpdate {
2899 timeline: Timeline {
2900 limited: false,
2901 prev_batch: Some("fondue".to_owned()),
2902 events: vec![f.text_msg("sup").into_event()],
2903 },
2904 ..Default::default()
2905 })
2906 .await
2907 .unwrap();
2908
2909 assert_matches!(
2911 generic_stream.recv().await,
2912 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2913 assert_eq!(expected_room_id, room_id);
2914 }
2915 );
2916
2917 {
2918 let state = room_event_cache.inner.state.read().await;
2919
2920 let mut num_gaps = 0;
2921 let mut num_events = 0;
2922
2923 for c in state.room_linked_chunk().chunks() {
2924 match c.content() {
2925 ChunkContent::Items(items) => num_events += items.len(),
2926 ChunkContent::Gap(gap) => {
2927 assert_eq!(gap.prev_token, "raclette");
2928 num_gaps += 1;
2929 }
2930 }
2931 }
2932
2933 assert_eq!(num_gaps, 1);
2935 assert_eq!(num_events, 2);
2936 }
2937 }
2938
2939 #[async_test]
2940 async fn test_shrink_to_last_chunk() {
2941 let room_id = room_id!("!galette:saucisse.bzh");
2942
2943 let client = MockClientBuilder::new(None).build().await;
2944
2945 let f = EventFactory::new().room(room_id);
2946
2947 let evid1 = event_id!("$1");
2948 let evid2 = event_id!("$2");
2949
2950 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2951 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2952
2953 {
2955 let store = client.event_cache_store();
2956 let store = store.lock().await.unwrap();
2957 store
2958 .handle_linked_chunk_updates(
2959 LinkedChunkId::Room(room_id),
2960 vec![
2961 Update::NewItemsChunk {
2962 previous: None,
2963 new: ChunkIdentifier::new(0),
2964 next: None,
2965 },
2966 Update::PushItems {
2967 at: Position::new(ChunkIdentifier::new(0), 0),
2968 items: vec![ev1],
2969 },
2970 Update::NewItemsChunk {
2971 previous: Some(ChunkIdentifier::new(0)),
2972 new: ChunkIdentifier::new(1),
2973 next: None,
2974 },
2975 Update::PushItems {
2976 at: Position::new(ChunkIdentifier::new(1), 0),
2977 items: vec![ev2],
2978 },
2979 ],
2980 )
2981 .await
2982 .unwrap();
2983 }
2984
2985 let event_cache = client.event_cache();
2986 event_cache.subscribe().unwrap();
2987
2988 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2989 let room = client.get_room(room_id).unwrap();
2990 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2991
2992 let (events, mut stream) = room_event_cache.subscribe().await;
2994 assert_eq!(events.len(), 1);
2995 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2996 assert!(stream.is_empty());
2997
2998 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2999
3000 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3002 assert_eq!(outcome.events.len(), 1);
3003 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3004 assert!(outcome.reached_start);
3005
3006 assert_let_timeout!(
3008 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3009 );
3010 assert_eq!(diffs.len(), 1);
3011 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3012 assert_eq!(value.event_id().as_deref(), Some(evid1));
3013 });
3014
3015 assert!(stream.is_empty());
3016
3017 assert_let_timeout!(
3019 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3020 );
3021 assert_eq!(received_room_id, room_id);
3022
3023 let diffs = room_event_cache
3025 .inner
3026 .state
3027 .write()
3028 .await
3029 .force_shrink_to_last_chunk()
3030 .await
3031 .expect("shrinking should succeed");
3032
3033 assert_eq!(diffs.len(), 2);
3035 assert_matches!(&diffs[0], VectorDiff::Clear);
3036 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3037 assert_eq!(values.len(), 1);
3038 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3039 });
3040
3041 assert!(stream.is_empty());
3042
3043 assert!(generic_stream.is_empty());
3045
3046 let events = room_event_cache.events().await;
3048 assert_eq!(events.len(), 1);
3049 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3050
3051 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3054 assert_eq!(outcome.events.len(), 1);
3055 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3056 assert!(outcome.reached_start);
3057 }
3058
3059 #[async_test]
3060 async fn test_room_ordering() {
3061 let room_id = room_id!("!galette:saucisse.bzh");
3062
3063 let client = MockClientBuilder::new(None).build().await;
3064
3065 let f = EventFactory::new().room(room_id).sender(*ALICE);
3066
3067 let evid1 = event_id!("$1");
3068 let evid2 = event_id!("$2");
3069 let evid3 = event_id!("$3");
3070
3071 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3072 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3073 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3074
3075 {
3077 let store = client.event_cache_store();
3078 let store = store.lock().await.unwrap();
3079 store
3080 .handle_linked_chunk_updates(
3081 LinkedChunkId::Room(room_id),
3082 vec![
3083 Update::NewItemsChunk {
3084 previous: None,
3085 new: ChunkIdentifier::new(0),
3086 next: None,
3087 },
3088 Update::PushItems {
3089 at: Position::new(ChunkIdentifier::new(0), 0),
3090 items: vec![ev1, ev2],
3091 },
3092 Update::NewItemsChunk {
3093 previous: Some(ChunkIdentifier::new(0)),
3094 new: ChunkIdentifier::new(1),
3095 next: None,
3096 },
3097 Update::PushItems {
3098 at: Position::new(ChunkIdentifier::new(1), 0),
3099 items: vec![ev3.clone()],
3100 },
3101 ],
3102 )
3103 .await
3104 .unwrap();
3105 }
3106
3107 let event_cache = client.event_cache();
3108 event_cache.subscribe().unwrap();
3109
3110 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3111 let room = client.get_room(room_id).unwrap();
3112 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3113
3114 {
3117 let state = room_event_cache.inner.state.read().await;
3118
3119 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3121
3122 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3124
3125 let mut events = state.room_linked_chunk().events();
3127 let (pos, ev) = events.next().unwrap();
3128 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3129 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3130 assert_eq!(state.room_event_order(pos), Some(2));
3131
3132 assert!(events.next().is_none());
3134 }
3135
3136 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3138 assert!(outcome.reached_start);
3139
3140 {
3143 let state = room_event_cache.inner.state.read().await;
3144 for (i, (pos, _)) in state.room_linked_chunk().events().enumerate() {
3145 assert_eq!(state.room_event_order(pos), Some(i));
3146 }
3147 }
3148
3149 let evid4 = event_id!("$4");
3154 room_event_cache
3155 .inner
3156 .handle_joined_room_update(JoinedRoomUpdate {
3157 timeline: Timeline {
3158 limited: true,
3159 prev_batch: Some("fondue".to_owned()),
3160 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3161 },
3162 ..Default::default()
3163 })
3164 .await
3165 .unwrap();
3166
3167 {
3168 let state = room_event_cache.inner.state.read().await;
3169
3170 let mut events = state.room_linked_chunk().events();
3172
3173 let (pos, ev) = events.next().unwrap();
3174 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3175 assert_eq!(state.room_event_order(pos), Some(2));
3176
3177 let (pos, ev) = events.next().unwrap();
3178 assert_eq!(ev.event_id().as_deref(), Some(evid4));
3179 assert_eq!(state.room_event_order(pos), Some(3));
3180
3181 assert!(events.next().is_none());
3183
3184 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3186 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3187
3188 assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(1), 0)), None);
3191 }
3192 }
3193
3194 #[async_test]
3195 async fn test_auto_shrink_after_all_subscribers_are_gone() {
3196 let room_id = room_id!("!galette:saucisse.bzh");
3197
3198 let client = MockClientBuilder::new(None).build().await;
3199
3200 let f = EventFactory::new().room(room_id);
3201
3202 let evid1 = event_id!("$1");
3203 let evid2 = event_id!("$2");
3204
3205 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3206 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3207
3208 {
3210 let store = client.event_cache_store();
3211 let store = store.lock().await.unwrap();
3212 store
3213 .handle_linked_chunk_updates(
3214 LinkedChunkId::Room(room_id),
3215 vec![
3216 Update::NewItemsChunk {
3217 previous: None,
3218 new: ChunkIdentifier::new(0),
3219 next: None,
3220 },
3221 Update::PushItems {
3222 at: Position::new(ChunkIdentifier::new(0), 0),
3223 items: vec![ev1],
3224 },
3225 Update::NewItemsChunk {
3226 previous: Some(ChunkIdentifier::new(0)),
3227 new: ChunkIdentifier::new(1),
3228 next: None,
3229 },
3230 Update::PushItems {
3231 at: Position::new(ChunkIdentifier::new(1), 0),
3232 items: vec![ev2],
3233 },
3234 ],
3235 )
3236 .await
3237 .unwrap();
3238 }
3239
3240 let event_cache = client.event_cache();
3241 event_cache.subscribe().unwrap();
3242
3243 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3244 let room = client.get_room(room_id).unwrap();
3245 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3246
3247 let (events1, mut stream1) = room_event_cache.subscribe().await;
3249 assert_eq!(events1.len(), 1);
3250 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3251 assert!(stream1.is_empty());
3252
3253 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3255 assert_eq!(outcome.events.len(), 1);
3256 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3257 assert!(outcome.reached_start);
3258
3259 assert_let_timeout!(
3262 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3263 );
3264 assert_eq!(diffs.len(), 1);
3265 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3266 assert_eq!(value.event_id().as_deref(), Some(evid1));
3267 });
3268
3269 assert!(stream1.is_empty());
3270
3271 let (events2, stream2) = room_event_cache.subscribe().await;
3275 assert_eq!(events2.len(), 2);
3276 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3277 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3278 assert!(stream2.is_empty());
3279
3280 drop(stream1);
3282 yield_now().await;
3283
3284 assert!(stream2.is_empty());
3286
3287 drop(stream2);
3289 yield_now().await;
3290
3291 {
3294 let state = room_event_cache.inner.state.read().await;
3296 assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0);
3297 }
3298
3299 let events3 = room_event_cache.events().await;
3301 assert_eq!(events3.len(), 1);
3302 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3303 }
3304
3305 #[async_test]
3306 async fn test_rfind_map_event_in_memory_by() {
3307 let user_id = user_id!("@mnt_io:matrix.org");
3308 let room_id = room_id!("!raclette:patate.ch");
3309 let client = MockClientBuilder::new(None).build().await;
3310
3311 let event_factory = EventFactory::new().room(room_id);
3312
3313 let event_id_0 = event_id!("$ev0");
3314 let event_id_1 = event_id!("$ev1");
3315 let event_id_2 = event_id!("$ev2");
3316 let event_id_3 = event_id!("$ev3");
3317
3318 let event_0 =
3319 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3320 let event_1 =
3321 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3322 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3323 let event_3 =
3324 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3325
3326 {
3329 let store = client.event_cache_store();
3330 let store = store.lock().await.unwrap();
3331 store
3332 .handle_linked_chunk_updates(
3333 LinkedChunkId::Room(room_id),
3334 vec![
3335 Update::NewItemsChunk {
3336 previous: None,
3337 new: ChunkIdentifier::new(0),
3338 next: None,
3339 },
3340 Update::PushItems {
3341 at: Position::new(ChunkIdentifier::new(0), 0),
3342 items: vec![event_3],
3343 },
3344 Update::NewItemsChunk {
3345 previous: Some(ChunkIdentifier::new(0)),
3346 new: ChunkIdentifier::new(1),
3347 next: None,
3348 },
3349 Update::PushItems {
3350 at: Position::new(ChunkIdentifier::new(1), 0),
3351 items: vec![event_0, event_1, event_2],
3352 },
3353 ],
3354 )
3355 .await
3356 .unwrap();
3357 }
3358
3359 let event_cache = client.event_cache();
3360 event_cache.subscribe().unwrap();
3361
3362 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3363 let room = client.get_room(room_id).unwrap();
3364 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3365
3366 assert_matches!(
3368 room_event_cache
3369 .rfind_map_event_in_memory_by(|event| {
3370 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
3371 })
3372 .await,
3373 Some(event_id) => {
3374 assert_eq!(event_id.as_deref(), Some(event_id_0));
3375 }
3376 );
3377
3378 assert_matches!(
3381 room_event_cache
3382 .rfind_map_event_in_memory_by(|event| {
3383 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
3384 })
3385 .await,
3386 Some(event_id) => {
3387 assert_eq!(event_id.as_deref(), Some(event_id_2));
3388 }
3389 );
3390
3391 assert!(
3393 room_event_cache
3394 .rfind_map_event_in_memory_by(|event| {
3395 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3396 == Some(user_id))
3397 .then(|| event.event_id())
3398 })
3399 .await
3400 .is_none()
3401 );
3402
3403 assert!(room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.is_none());
3405 }
3406}