1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26
27use events::{sort_positions_descending, Gap};
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::{AmbiguityChange, TimelineEvent},
32 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
33};
34use ruma::{
35 events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
36 serde::Raw,
37 EventId, OwnedEventId, OwnedRoomId,
38};
39use tokio::sync::{
40 broadcast::{Receiver, Sender},
41 mpsc, Notify, RwLock,
42};
43use tracing::{error, instrument, trace, warn};
44
45use super::{
46 deduplicator::DeduplicationOutcome, AllEventsCache, AutoShrinkChannelPayload, EventsOrigin,
47 Result, RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
48};
49use crate::{client::WeakClient, room::WeakRoom};
50
51pub(super) mod events;
52
53#[derive(Clone)]
57pub struct RoomEventCache {
58 pub(super) inner: Arc<RoomEventCacheInner>,
59}
60
61impl fmt::Debug for RoomEventCache {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 f.debug_struct("RoomEventCache").finish_non_exhaustive()
64 }
65}
66
67#[allow(missing_debug_implementations)]
70pub struct RoomEventCacheListener {
71 recv: Receiver<RoomEventCacheUpdate>,
73
74 room_id: OwnedRoomId,
76
77 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
79
80 listener_count: Arc<AtomicUsize>,
82}
83
84impl Drop for RoomEventCacheListener {
85 fn drop(&mut self) {
86 let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst);
87
88 trace!("dropping a room event cache listener; previous count: {previous_listener_count}");
89
90 if previous_listener_count == 1 {
91 let mut room_id = self.room_id.clone();
95
96 let mut num_attempts = 0;
102
103 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
104 num_attempts += 1;
105
106 if num_attempts > 1024 {
107 warn!("couldn't send notification to the auto-shrink channel after 1024 attempts; giving up");
110 return;
111 }
112
113 match err {
114 mpsc::error::TrySendError::Full(stolen_room_id) => {
115 room_id = stolen_room_id;
116 }
117 mpsc::error::TrySendError::Closed(_) => return,
118 }
119 }
120
121 trace!("sent notification to the parent channel that we were the last listener");
122 }
123 }
124}
125
126impl Deref for RoomEventCacheListener {
127 type Target = Receiver<RoomEventCacheUpdate>;
128
129 fn deref(&self) -> &Self::Target {
130 &self.recv
131 }
132}
133
134impl DerefMut for RoomEventCacheListener {
135 fn deref_mut(&mut self) -> &mut Self::Target {
136 &mut self.recv
137 }
138}
139
140impl RoomEventCache {
141 pub(super) fn new(
143 client: WeakClient,
144 state: RoomEventCacheState,
145 pagination_status: SharedObservable<RoomPaginationStatus>,
146 room_id: OwnedRoomId,
147 all_events_cache: Arc<RwLock<AllEventsCache>>,
148 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
149 ) -> Self {
150 Self {
151 inner: Arc::new(RoomEventCacheInner::new(
152 client,
153 state,
154 pagination_status,
155 room_id,
156 all_events_cache,
157 auto_shrink_sender,
158 )),
159 }
160 }
161
162 pub async fn subscribe(&self) -> (Vec<TimelineEvent>, RoomEventCacheListener) {
165 let state = self.inner.state.read().await;
166 let events = state.events().events().map(|(_position, item)| item.clone()).collect();
167
168 let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst);
169 trace!("added a room event cache listener; new count: {}", previous_listener_count + 1);
170
171 let recv = self.inner.sender.subscribe();
172 let listener = RoomEventCacheListener {
173 recv,
174 room_id: self.inner.room_id.clone(),
175 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
176 listener_count: state.listener_count.clone(),
177 };
178
179 (events, listener)
180 }
181
182 pub fn pagination(&self) -> RoomPagination {
185 RoomPagination { inner: self.inner.clone() }
186 }
187
188 pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
190 let Ok(maybe_position_and_event) = self.inner.state.read().await.find_event(event_id).await
192 else {
193 error!("Failed to find the event");
194
195 return None;
196 };
197
198 if let Some(event) = maybe_position_and_event.map(|(_location, _position, event)| event) {
200 Some(event)
201 } else if let Some((room_id, event)) =
202 self.inner.all_events.read().await.events.get(event_id).cloned()
203 {
204 (room_id == self.inner.room_id).then_some(event)
205 } else {
206 None
207 }
208 }
209
210 pub async fn event_with_relations(
215 &self,
216 event_id: &EventId,
217 filter: Option<Vec<RelationType>>,
218 ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
219 let cache = self.inner.all_events.read().await;
220 if let Some((_, event)) = cache.events.get(event_id) {
221 let related_events = cache.collect_related_events(event_id, filter.as_deref());
222 Some((event.clone(), related_events))
223 } else {
224 None
225 }
226 }
227
228 pub async fn clear(&self) -> Result<()> {
233 let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
235
236 self.inner.all_events.write().await.clear();
238
239 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
241 diffs: updates_as_vector_diffs,
242 origin: EventsOrigin::Cache,
243 });
244
245 Ok(())
246 }
247
248 pub(crate) async fn save_event(&self, event: TimelineEvent) {
254 if let Some(event_id) = event.event_id() {
255 let mut cache = self.inner.all_events.write().await;
256
257 cache.append_related_event(&event);
258 cache.events.insert(event_id, (self.inner.room_id.clone(), event));
259 } else {
260 warn!("couldn't save event without event id in the event cache");
261 }
262 }
263
264 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
271 let mut cache = self.inner.all_events.write().await;
272 for event in events {
273 if let Some(event_id) = event.event_id() {
274 cache.append_related_event(&event);
275 cache.events.insert(event_id, (self.inner.room_id.clone(), event));
276 } else {
277 warn!("couldn't save event without event id in the event cache");
278 }
279 }
280 }
281
282 pub async fn debug_string(&self) -> Vec<String> {
285 self.inner.state.read().await.events().debug_string()
286 }
287}
288
289pub(super) struct RoomEventCacheInner {
291 room_id: OwnedRoomId,
293
294 pub weak_room: WeakRoom,
295
296 pub sender: Sender<RoomEventCacheUpdate>,
298
299 pub state: RwLock<RoomEventCacheState>,
301
302 all_events: Arc<RwLock<AllEventsCache>>,
307
308 pub pagination_batch_token_notifier: Notify,
310
311 pub pagination_status: SharedObservable<RoomPaginationStatus>,
312
313 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
318}
319
320impl RoomEventCacheInner {
321 fn new(
324 client: WeakClient,
325 state: RoomEventCacheState,
326 pagination_status: SharedObservable<RoomPaginationStatus>,
327 room_id: OwnedRoomId,
328 all_events_cache: Arc<RwLock<AllEventsCache>>,
329 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
330 ) -> Self {
331 let sender = Sender::new(32);
332 let weak_room = WeakRoom::new(client, room_id);
333 Self {
334 room_id: weak_room.room_id().to_owned(),
335 weak_room,
336 state: RwLock::new(state),
337 all_events: all_events_cache,
338 sender,
339 pagination_batch_token_notifier: Default::default(),
340 auto_shrink_sender,
341 pagination_status,
342 }
343 }
344
345 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
346 if account_data.is_empty() {
347 return;
348 }
349
350 let mut handled_read_marker = false;
351
352 trace!("Handling account data");
353
354 for raw_event in account_data {
355 match raw_event.deserialize() {
356 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
357 if handled_read_marker {
360 continue;
361 }
362
363 handled_read_marker = true;
364
365 let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
367 event_id: ev.content.event_id,
368 });
369 }
370
371 Ok(_) => {
372 }
375
376 Err(e) => {
377 let event_type = raw_event.get_field::<String>("type").ok().flatten();
378 warn!(event_type, "Failed to deserialize account data: {e}");
379 }
380 }
381 }
382 }
383
384 #[instrument(skip_all, fields(room_id = %self.room_id))]
385 pub(super) async fn handle_joined_room_update(
386 &self,
387 has_storage: bool,
388 updates: JoinedRoomUpdate,
389 ) -> Result<()> {
390 self.handle_timeline(
391 has_storage,
392 updates.timeline,
393 updates.ephemeral.clone(),
394 updates.ambiguity_changes,
395 )
396 .await?;
397
398 self.handle_account_data(updates.account_data);
399
400 Ok(())
401 }
402
403 async fn handle_timeline(
404 &self,
405 has_storage: bool,
406 timeline: Timeline,
407 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
408 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
409 ) -> Result<()> {
410 if !has_storage && timeline.limited {
411 trace!("limited timeline, clearing all previous events and pushing new events");
415
416 self.replace_all_events_by(
417 timeline.events,
418 timeline.prev_batch,
419 ephemeral_events,
420 ambiguity_changes,
421 EventsOrigin::Sync,
422 )
423 .await?;
424 } else {
425 trace!("adding new events");
427
428 let prev_batch =
436 if has_storage && !timeline.limited { None } else { timeline.prev_batch };
437
438 let mut state = self.state.write().await;
439 self.append_events_locked(
440 &mut state,
441 timeline.events,
442 prev_batch,
443 ephemeral_events,
444 ambiguity_changes,
445 )
446 .await?;
447 }
448
449 Ok(())
450 }
451
452 #[instrument(skip_all, fields(room_id = %self.room_id))]
453 pub(super) async fn handle_left_room_update(
454 &self,
455 has_storage: bool,
456 updates: LeftRoomUpdate,
457 ) -> Result<()> {
458 self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes)
459 .await?;
460 Ok(())
461 }
462
463 pub(super) async fn replace_all_events_by(
466 &self,
467 timeline_events: Vec<TimelineEvent>,
468 prev_batch: Option<String>,
469 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
470 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
471 events_origin: EventsOrigin,
472 ) -> Result<()> {
473 let mut state = self.state.write().await;
475
476 let updates_as_vector_diffs = state.reset().await?;
478
479 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
481 diffs: updates_as_vector_diffs,
482 origin: events_origin,
483 });
484
485 self.append_events_locked(
487 &mut state,
488 timeline_events,
489 prev_batch.clone(),
490 ephemeral_events,
491 ambiguity_changes,
492 )
493 .await?;
494
495 Ok(())
496 }
497
498 async fn append_events_locked(
503 &self,
504 state: &mut RoomEventCacheState,
505 timeline_events: Vec<TimelineEvent>,
506 prev_batch: Option<String>,
507 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
508 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
509 ) -> Result<()> {
510 if timeline_events.is_empty()
511 && prev_batch.is_none()
512 && ephemeral_events.is_empty()
513 && ambiguity_changes.is_empty()
514 {
515 return Ok(());
516 }
517
518 let (
519 DeduplicationOutcome {
520 all_events: events,
521 in_memory_duplicated_event_ids,
522 in_store_duplicated_event_ids,
523 },
524 all_duplicates,
525 ) = state.collect_valid_and_duplicated_events(timeline_events).await?;
526
527 let timeline_event_diffs = if all_duplicates {
532 vec![]
534 } else {
535 let mut timeline_event_diffs = state
541 .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
542 .await?;
543
544 let new_timeline_event_diffs = state
547 .with_events_mut(|room_events| {
548 if !all_duplicates {
553 if let Some(prev_token) = &prev_batch {
554 let prev_chunk_to_remove =
557 room_events.rchunks().next().and_then(|chunk| {
558 (chunk.is_items() && chunk.num_items() == 0)
559 .then_some(chunk.identifier())
560 });
561
562 room_events.push_gap(Gap { prev_token: prev_token.clone() });
563
564 if let Some(prev_chunk_to_remove) = prev_chunk_to_remove {
565 room_events.remove_empty_chunk_at(prev_chunk_to_remove).expect(
566 "we just checked the chunk is there, and it's an empty item chunk",
567 );
568 }
569 }
570 }
571
572 room_events.push_events(events.clone());
573
574 events.clone()
575 })
576 .await?;
577
578 timeline_event_diffs.extend(new_timeline_event_diffs);
579
580 if prev_batch.is_some() && !all_duplicates {
581 if let Some(diffs) = state.shrink_to_last_chunk().await? {
589 timeline_event_diffs = diffs;
592 }
593 }
594
595 {
596 let mut all_events_cache = self.all_events.write().await;
598
599 for event in events {
600 if let Some(event_id) = event.event_id() {
601 all_events_cache.append_related_event(&event);
602 all_events_cache
603 .events
604 .insert(event_id.to_owned(), (self.room_id.clone(), event));
605 }
606 }
607 }
608
609 timeline_event_diffs
610 };
611
612 if prev_batch.is_some() {
615 self.pagination_batch_token_notifier.notify_one();
616 }
617
618 {
620 if !timeline_event_diffs.is_empty() {
621 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
622 diffs: timeline_event_diffs,
623 origin: EventsOrigin::Sync,
624 });
625 }
626
627 if !ephemeral_events.is_empty() {
628 let _ = self
629 .sender
630 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
631 }
632
633 if !ambiguity_changes.is_empty() {
634 let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
635 }
636 }
637
638 Ok(())
639 }
640}
641
642#[derive(Debug)]
645pub(super) enum LoadMoreEventsBackwardsOutcome {
646 Gap {
648 prev_token: Option<String>,
651 },
652
653 StartOfTimeline,
655
656 Events {
658 events: Vec<TimelineEvent>,
659 timeline_event_diffs: Vec<VectorDiff<TimelineEvent>>,
660 reached_start: bool,
661 },
662
663 WaitForInitialPrevToken,
665}
666
667mod private {
669 use std::sync::{atomic::AtomicUsize, Arc};
670
671 use eyeball::SharedObservable;
672 use eyeball_im::VectorDiff;
673 use matrix_sdk_base::{
674 apply_redaction,
675 deserialized_responses::{TimelineEvent, TimelineEventKind},
676 event_cache::{store::EventCacheStoreLock, Event, Gap},
677 linked_chunk::{lazy_loader, ChunkContent, ChunkIdentifierGenerator, Position, Update},
678 };
679 use matrix_sdk_common::executor::spawn;
680 use once_cell::sync::OnceCell;
681 use ruma::{
682 events::{
683 room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent, MessageLikeEventType,
684 },
685 serde::Raw,
686 EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
687 };
688 use tracing::{debug, error, instrument, trace, warn};
689
690 use super::{
691 super::{
692 deduplicator::{DeduplicationOutcome, Deduplicator},
693 EventCacheError,
694 },
695 events::RoomEvents,
696 sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
697 };
698 use crate::event_cache::RoomPaginationStatus;
699
700 pub struct RoomEventCacheState {
705 room: OwnedRoomId,
707
708 room_version: RoomVersionId,
710
711 store: Arc<OnceCell<EventCacheStoreLock>>,
716
717 events: RoomEvents,
719
720 deduplicator: Deduplicator,
722
723 pub waited_for_initial_prev_token: bool,
728
729 pagination_status: SharedObservable<RoomPaginationStatus>,
730
731 pub(super) listener_count: Arc<AtomicUsize>,
734 }
735
736 impl RoomEventCacheState {
737 pub async fn new(
747 room_id: OwnedRoomId,
748 room_version: RoomVersionId,
749 store: Arc<OnceCell<EventCacheStoreLock>>,
750 pagination_status: SharedObservable<RoomPaginationStatus>,
751 ) -> Result<Self, EventCacheError> {
752 let (events, deduplicator) = if let Some(store) = store.get() {
753 let store_lock = store.lock().await?;
754
755 let linked_chunk = match store_lock
756 .load_last_chunk(&room_id)
757 .await
758 .map_err(EventCacheError::from)
759 .and_then(|(last_chunk, chunk_identifier_generator)| {
760 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
761 .map_err(EventCacheError::from)
762 }) {
763 Ok(linked_chunk) => linked_chunk,
764
765 Err(err) => {
766 error!("error when reloading a linked chunk from memory: {err}");
767
768 store_lock
770 .handle_linked_chunk_updates(&room_id, vec![Update::Clear])
771 .await?;
772
773 None
775 }
776 };
777
778 (
779 RoomEvents::with_initial_linked_chunk(linked_chunk),
780 Deduplicator::new_store_based(room_id.clone(), store.clone()),
781 )
782 } else {
783 (RoomEvents::default(), Deduplicator::new_memory_based())
784 };
785
786 Ok(Self {
787 room: room_id,
788 room_version,
789 store,
790 events,
791 deduplicator,
792 waited_for_initial_prev_token: false,
793 listener_count: Default::default(),
794 pagination_status,
795 })
796 }
797
798 pub async fn collect_valid_and_duplicated_events(
825 &mut self,
826 events: Vec<Event>,
827 ) -> Result<(DeduplicationOutcome, bool), EventCacheError> {
828 let deduplication_outcome =
829 self.deduplicator.filter_duplicate_events(events, &self.events).await?;
830
831 let number_of_events = deduplication_outcome.all_events.len();
832 let number_of_deduplicated_events =
833 deduplication_outcome.in_memory_duplicated_event_ids.len()
834 + deduplication_outcome.in_store_duplicated_event_ids.len();
835
836 let all_duplicates =
837 number_of_events > 0 && number_of_events == number_of_deduplicated_events;
838
839 Ok((deduplication_outcome, all_duplicates))
840 }
841
842 fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
845 if self.events.events().next().is_some() {
850 trace!("chunk is fully loaded and non-empty: reached_start=true");
853 LoadMoreEventsBackwardsOutcome::StartOfTimeline
854 } else if !self.waited_for_initial_prev_token {
855 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
857 } else {
858 LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
862 }
863 }
864
865 pub(in super::super) async fn load_more_events_backwards(
867 &mut self,
868 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
869 let Some(store) = self.store.get() else {
870 if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
877 return Ok(LoadMoreEventsBackwardsOutcome::Gap {
878 prev_token: Some(prev_token),
879 });
880 }
881
882 return Ok(self.conclude_load_more_for_fully_loaded_chunk());
883 };
884
885 if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
888 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
889 }
890
891 let first_chunk_identifier =
894 self.events.chunks().next().expect("a linked chunk is never empty").identifier();
895
896 let store = store.lock().await?;
897
898 let new_first_chunk =
900 match store.load_previous_chunk(&self.room, first_chunk_identifier).await {
901 Ok(Some(new_first_chunk)) => {
902 new_first_chunk
904 }
905
906 Ok(None) => {
907 return Ok(self.conclude_load_more_for_fully_loaded_chunk());
909 }
910
911 Err(err) => {
912 error!("error when loading the previous chunk of a linked chunk: {err}");
913
914 store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
916
917 return Err(err.into());
919 }
920 };
921
922 let chunk_content = new_first_chunk.content.clone();
923
924 let reached_start = new_first_chunk.previous.is_none();
930
931 if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
932 error!("error when inserting the previous chunk into its linked chunk: {err}");
933
934 store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
936
937 return Err(err.into());
939 };
940
941 let _ = self.events.store_updates().take();
944
945 let timeline_event_diffs = self.events.updates_as_vector_diffs();
947
948 Ok(match chunk_content {
949 ChunkContent::Gap(gap) => {
950 trace!("reloaded chunk from disk (gap)");
951 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
952 }
953
954 ChunkContent::Items(events) => {
955 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
956 LoadMoreEventsBackwardsOutcome::Events {
957 events,
958 timeline_event_diffs,
959 reached_start,
960 }
961 }
962 })
963 }
964
965 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
974 pub(super) async fn shrink_to_last_chunk(
975 &mut self,
976 ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
977 let Some(store) = self.store.get() else {
978 return Ok(None);
981 };
982
983 let store_lock = store.lock().await?;
984
985 let (last_chunk, chunk_identifier_generator) = match store_lock
987 .load_last_chunk(&self.room)
988 .await
989 {
990 Ok(pair) => pair,
991
992 Err(err) => {
993 error!("error when reloading a linked chunk from memory: {err}");
995
996 store_lock.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
998
999 (None, ChunkIdentifierGenerator::new_from_scratch())
1001 }
1002 };
1003
1004 debug!("unloading the linked chunk, and resetting it to its last chunk");
1005
1006 if let Err(err) = self.events.replace_with(last_chunk, chunk_identifier_generator) {
1009 error!("error when replacing the linked chunk: {err}");
1010 return self.reset().await.map(Some);
1011 }
1012
1013 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1017
1018 let _ = self.events.store_updates().take();
1021
1022 let diffs = self.events.updates_as_vector_diffs();
1025 assert!(matches!(diffs[0], VectorDiff::Clear));
1026
1027 Ok(Some(diffs))
1028 }
1029
1030 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1033 pub(crate) async fn auto_shrink_if_no_listeners(
1034 &mut self,
1035 ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
1036 let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst);
1037
1038 trace!(listener_count, "received request to auto-shrink");
1039
1040 if listener_count == 0 {
1041 self.shrink_to_last_chunk().await
1044 } else {
1045 Ok(None)
1046 }
1047 }
1048
1049 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1053 let mut closure = || -> Option<()> {
1057 let mut val: serde_json::Value = event.deserialize_as().ok()?;
1058 let unsigned = val.get_mut("unsigned")?;
1059 let unsigned_obj = unsigned.as_object_mut()?;
1060 if unsigned_obj.remove("m.relations").is_some() {
1061 *event = Raw::new(&val).ok()?.cast();
1062 }
1063 None
1064 };
1065 let _ = closure();
1066 }
1067
1068 fn strip_relations_from_event(ev: &mut TimelineEvent) {
1069 match &mut ev.kind {
1070 TimelineEventKind::Decrypted(decrypted) => {
1071 decrypted.unsigned_encryption_info = None;
1074
1075 Self::strip_relations_if_present(&mut decrypted.event);
1077 }
1078
1079 TimelineEventKind::UnableToDecrypt { event, .. }
1080 | TimelineEventKind::PlainText { event } => {
1081 Self::strip_relations_if_present(event);
1082 }
1083 }
1084 }
1085
1086 fn strip_relations_from_events(items: &mut [TimelineEvent]) {
1088 for ev in items.iter_mut() {
1089 Self::strip_relations_from_event(ev);
1090 }
1091 }
1092
1093 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1099 #[instrument(skip_all)]
1100 pub(crate) async fn remove_events(
1101 &mut self,
1102 in_memory_events: Vec<(OwnedEventId, Position)>,
1103 in_store_events: Vec<(OwnedEventId, Position)>,
1104 ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1105 if !in_store_events.is_empty() {
1107 let mut positions = in_store_events
1108 .into_iter()
1109 .map(|(_event_id, position)| position)
1110 .collect::<Vec<_>>();
1111
1112 sort_positions_descending(&mut positions);
1113
1114 self.send_updates_to_store(
1115 positions
1116 .into_iter()
1117 .map(|position| Update::RemoveItem { at: position })
1118 .collect(),
1119 )
1120 .await?;
1121 }
1122
1123 let timeline_event_diffs = if !in_memory_events.is_empty() {
1125 self.with_events_mut(|room_events| {
1126 room_events
1128 .remove_events_by_position(
1129 in_memory_events
1130 .into_iter()
1131 .map(|(_event_id, position)| position)
1132 .collect(),
1133 )
1134 .expect("failed to remove an event");
1135
1136 vec![]
1137 })
1138 .await?
1139 } else {
1140 Vec::new()
1141 };
1142
1143 Ok(timeline_event_diffs)
1144 }
1145
1146 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1148 let updates = self.events.store_updates().take();
1149 self.send_updates_to_store(updates).await
1150 }
1151
1152 pub async fn send_updates_to_store(
1153 &mut self,
1154 mut updates: Vec<Update<TimelineEvent, Gap>>,
1155 ) -> Result<(), EventCacheError> {
1156 let Some(store) = self.store.get() else {
1157 return Ok(());
1158 };
1159
1160 if updates.is_empty() {
1161 return Ok(());
1162 }
1163
1164 for update in updates.iter_mut() {
1166 match update {
1167 Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1168 Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1169 Update::NewItemsChunk { .. }
1171 | Update::NewGapChunk { .. }
1172 | Update::RemoveChunk(_)
1173 | Update::RemoveItem { .. }
1174 | Update::DetachLastItems { .. }
1175 | Update::StartReattachItems
1176 | Update::EndReattachItems
1177 | Update::Clear => {}
1178 }
1179 }
1180
1181 let store = store.clone();
1188 let room_id = self.room.clone();
1189
1190 spawn(async move {
1191 let store = store.lock().await?;
1192
1193 trace!(?updates, "sending linked chunk updates to the store");
1194 store.handle_linked_chunk_updates(&room_id, updates).await?;
1195 trace!("linked chunk updates applied");
1196
1197 super::Result::Ok(())
1198 })
1199 .await
1200 .expect("joining failed")?;
1201
1202 Ok(())
1203 }
1204
1205 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1211 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1212 self.events.reset();
1213
1214 self.propagate_changes().await?;
1215
1216 self.waited_for_initial_prev_token = false;
1220 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1222
1223 let diff_updates = self.events.updates_as_vector_diffs();
1224
1225 debug_assert_eq!(diff_updates.len(), 1);
1227 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1228
1229 Ok(diff_updates)
1230 }
1231
1232 pub fn events(&self) -> &RoomEvents {
1234 &self.events
1235 }
1236
1237 pub async fn find_event(
1242 &self,
1243 event_id: &EventId,
1244 ) -> Result<Option<(EventLocation, Position, TimelineEvent)>, EventCacheError> {
1245 let room_id = self.room.as_ref();
1246
1247 for (position, event) in self.events().revents() {
1250 if event.event_id().as_deref() == Some(event_id) {
1251 return Ok(Some((EventLocation::Memory, position, event.clone())));
1252 }
1253 }
1254
1255 let Some(store) = self.store.get() else {
1256 return Ok(None);
1258 };
1259
1260 let store = store.lock().await?;
1261
1262 Ok(store
1263 .find_event(room_id, event_id)
1264 .await?
1265 .map(|(position, event)| (EventLocation::Store, position, event)))
1266 }
1267
1268 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1279 #[instrument(skip_all, fields(room_id = %self.room))]
1280 pub async fn with_events_mut<F>(
1281 &mut self,
1282 func: F,
1283 ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError>
1284 where
1285 F: FnOnce(&mut RoomEvents) -> Vec<TimelineEvent>,
1286 {
1287 let events_to_post_process = func(&mut self.events);
1288
1289 self.propagate_changes().await?;
1291
1292 for event in &events_to_post_process {
1293 self.maybe_apply_new_redaction(event).await?;
1294 }
1295
1296 if !self.waited_for_initial_prev_token
1299 && self.events.chunks().any(|chunk| chunk.is_gap())
1300 {
1301 self.waited_for_initial_prev_token = true;
1302 }
1303
1304 let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
1305
1306 Ok(updates_as_vector_diffs)
1307 }
1308
1309 #[instrument(skip_all)]
1313 async fn maybe_apply_new_redaction(
1314 &mut self,
1315 event: &Event,
1316 ) -> Result<(), EventCacheError> {
1317 let raw_event = event.raw();
1318
1319 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1322 raw_event.get_field::<MessageLikeEventType>("type")
1323 else {
1324 return Ok(());
1325 };
1326
1327 let Ok(AnySyncTimelineEvent::MessageLike(
1330 ruma::events::AnySyncMessageLikeEvent::RoomRedaction(redaction),
1331 )) = event.raw().deserialize()
1332 else {
1333 return Ok(());
1334 };
1335
1336 let Some(event_id) = redaction.redacts(&self.room_version) else {
1337 warn!("missing target event id from the redaction event");
1338 return Ok(());
1339 };
1340
1341 if let Some((location, position, target_event)) = self.find_event(event_id).await? {
1343 if let Ok(deserialized) = target_event.raw().deserialize() {
1345 match deserialized {
1346 AnySyncTimelineEvent::MessageLike(ev) => {
1347 if ev.is_redacted() {
1348 return Ok(());
1349 }
1350 }
1351 AnySyncTimelineEvent::State(ev) => {
1352 if ev.is_redacted() {
1353 return Ok(());
1354 }
1355 }
1356 }
1357 }
1358
1359 if let Some(redacted_event) = apply_redaction(
1360 target_event.raw(),
1361 event.raw().cast_ref::<SyncRoomRedactionEvent>(),
1362 &self.room_version,
1363 ) {
1364 let mut copy = target_event.clone();
1365
1366 copy.replace_raw(redacted_event.cast());
1371
1372 match location {
1373 EventLocation::Memory => {
1374 self.events
1375 .replace_event_at(position, copy)
1376 .expect("should have been a valid position of an item");
1377 }
1378 EventLocation::Store => {
1379 self.send_updates_to_store(vec![Update::ReplaceItem {
1380 at: position,
1381 item: copy,
1382 }])
1383 .await?;
1384 }
1385 }
1386 }
1387 } else {
1388 trace!("redacted event is missing from the linked chunk");
1389 }
1390
1391 Ok(())
1394 }
1395 }
1396}
1397
1398pub(super) enum EventLocation {
1400 Memory,
1402
1403 Store,
1405}
1406
1407pub(super) use private::RoomEventCacheState;
1408
1409#[cfg(test)]
1410mod tests {
1411 use std::sync::Arc;
1412
1413 use assert_matches::assert_matches;
1414 use assert_matches2::assert_let;
1415 use matrix_sdk_base::{
1416 event_cache::{
1417 store::{EventCacheStore as _, MemoryStore},
1418 Gap,
1419 },
1420 linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1421 store::StoreConfig,
1422 sync::{JoinedRoomUpdate, Timeline},
1423 };
1424 use matrix_sdk_common::deserialized_responses::TimelineEvent;
1425 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
1426 use ruma::{
1427 event_id,
1428 events::{
1429 relation::RelationType, room::message::RoomMessageEventContentWithoutRelation,
1430 AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1431 },
1432 room_id, user_id, RoomId,
1433 };
1434
1435 use crate::test_utils::{client::MockClientBuilder, logged_in_client};
1436
1437 #[async_test]
1438 async fn test_event_with_redaction_relation() {
1439 let original_id = event_id!("$original");
1440 let related_id = event_id!("$related");
1441 let room_id = room_id!("!galette:saucisse.bzh");
1442 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1443
1444 assert_relations(
1445 room_id,
1446 f.text_msg("Original event").event_id(original_id).into(),
1447 f.redaction(original_id).event_id(related_id).into(),
1448 f,
1449 )
1450 .await;
1451 }
1452
1453 #[async_test]
1454 async fn test_event_with_edit_relation() {
1455 let original_id = event_id!("$original");
1456 let related_id = event_id!("$related");
1457 let room_id = room_id!("!galette:saucisse.bzh");
1458 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1459
1460 assert_relations(
1461 room_id,
1462 f.text_msg("Original event").event_id(original_id).into(),
1463 f.text_msg("* An edited event")
1464 .edit(
1465 original_id,
1466 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1467 )
1468 .event_id(related_id)
1469 .into(),
1470 f,
1471 )
1472 .await;
1473 }
1474
1475 #[async_test]
1476 async fn test_event_with_reply_relation() {
1477 let original_id = event_id!("$original");
1478 let related_id = event_id!("$related");
1479 let room_id = room_id!("!galette:saucisse.bzh");
1480 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1481
1482 assert_relations(
1483 room_id,
1484 f.text_msg("Original event").event_id(original_id).into(),
1485 f.text_msg("A reply").reply_to(original_id).event_id(related_id).into(),
1486 f,
1487 )
1488 .await;
1489 }
1490
1491 #[async_test]
1492 async fn test_event_with_thread_reply_relation() {
1493 let original_id = event_id!("$original");
1494 let related_id = event_id!("$related");
1495 let room_id = room_id!("!galette:saucisse.bzh");
1496 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1497
1498 assert_relations(
1499 room_id,
1500 f.text_msg("Original event").event_id(original_id).into(),
1501 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
1502 f,
1503 )
1504 .await;
1505 }
1506
1507 #[async_test]
1508 async fn test_event_with_reaction_relation() {
1509 let original_id = event_id!("$original");
1510 let related_id = event_id!("$related");
1511 let room_id = room_id!("!galette:saucisse.bzh");
1512 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1513
1514 assert_relations(
1515 room_id,
1516 f.text_msg("Original event").event_id(original_id).into(),
1517 f.reaction(original_id, ":D").event_id(related_id).into(),
1518 f,
1519 )
1520 .await;
1521 }
1522
1523 #[async_test]
1524 async fn test_event_with_poll_response_relation() {
1525 let original_id = event_id!("$original");
1526 let related_id = event_id!("$related");
1527 let room_id = room_id!("!galette:saucisse.bzh");
1528 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1529
1530 assert_relations(
1531 room_id,
1532 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1533 .event_id(original_id)
1534 .into(),
1535 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
1536 f,
1537 )
1538 .await;
1539 }
1540
1541 #[async_test]
1542 async fn test_event_with_poll_end_relation() {
1543 let original_id = event_id!("$original");
1544 let related_id = event_id!("$related");
1545 let room_id = room_id!("!galette:saucisse.bzh");
1546 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1547
1548 assert_relations(
1549 room_id,
1550 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1551 .event_id(original_id)
1552 .into(),
1553 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
1554 f,
1555 )
1556 .await;
1557 }
1558
1559 #[async_test]
1560 async fn test_event_with_filtered_relationships() {
1561 let original_id = event_id!("$original");
1562 let related_id = event_id!("$related");
1563 let associated_related_id = event_id!("$recursive_related");
1564 let room_id = room_id!("!galette:saucisse.bzh");
1565 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1566
1567 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1568 let related_event = event_factory
1569 .text_msg("* Edited event")
1570 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1571 .event_id(related_id)
1572 .into();
1573 let associated_related_event =
1574 event_factory.redaction(related_id).event_id(associated_related_id).into();
1575
1576 let client = logged_in_client(None).await;
1577
1578 let event_cache = client.event_cache();
1579 event_cache.subscribe().unwrap();
1580
1581 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1582 let room = client.get_room(room_id).unwrap();
1583
1584 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1585
1586 room_event_cache.save_event(original_event).await;
1588
1589 room_event_cache.save_event(related_event).await;
1591
1592 room_event_cache.save_event(associated_related_event).await;
1594
1595 let filter = Some(vec![RelationType::Replacement]);
1596 let (event, related_events) =
1597 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1598 let cached_event_id = event.event_id().unwrap();
1600 assert_eq!(cached_event_id, original_id);
1601
1602 assert_eq!(related_events.len(), 2);
1604
1605 let related_event_id = related_events[0].event_id().unwrap();
1606 assert_eq!(related_event_id, related_id);
1607 let related_event_id = related_events[1].event_id().unwrap();
1608 assert_eq!(related_event_id, associated_related_id);
1609
1610 let filter = Some(vec![RelationType::Thread]);
1612 let (event, related_events) =
1613 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1614 let cached_event_id = event.event_id().unwrap();
1616 assert_eq!(cached_event_id, original_id);
1617 assert!(related_events.is_empty());
1619 }
1620
1621 #[async_test]
1622 async fn test_event_with_recursive_relation() {
1623 let original_id = event_id!("$original");
1624 let related_id = event_id!("$related");
1625 let associated_related_id = event_id!("$recursive_related");
1626 let room_id = room_id!("!galette:saucisse.bzh");
1627 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1628
1629 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1630 let related_event = event_factory
1631 .text_msg("* Edited event")
1632 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1633 .event_id(related_id)
1634 .into();
1635 let associated_related_event =
1636 event_factory.redaction(related_id).event_id(associated_related_id).into();
1637
1638 let client = logged_in_client(None).await;
1639
1640 let event_cache = client.event_cache();
1641 event_cache.subscribe().unwrap();
1642
1643 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1644 let room = client.get_room(room_id).unwrap();
1645
1646 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1647
1648 room_event_cache.save_event(original_event).await;
1650
1651 room_event_cache.save_event(related_event).await;
1653
1654 room_event_cache.save_event(associated_related_event).await;
1656
1657 let (event, related_events) =
1658 room_event_cache.event_with_relations(original_id, None).await.unwrap();
1659 let cached_event_id = event.event_id().unwrap();
1661 assert_eq!(cached_event_id, original_id);
1662
1663 assert_eq!(related_events.len(), 2);
1665
1666 let related_event_id = related_events[0].event_id().unwrap();
1667 assert_eq!(related_event_id, related_id);
1668 let related_event_id = related_events[1].event_id().unwrap();
1669 assert_eq!(related_event_id, associated_related_id);
1670 }
1671
1672 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1674 async fn test_write_to_storage() {
1675 use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1676
1677 let room_id = room_id!("!galette:saucisse.bzh");
1678 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1679
1680 let event_cache_store = Arc::new(MemoryStore::new());
1681
1682 let client = MockClientBuilder::new("http://localhost".to_owned())
1683 .store_config(
1684 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1685 )
1686 .build()
1687 .await;
1688
1689 let event_cache = client.event_cache();
1690
1691 event_cache.subscribe().unwrap();
1693 event_cache.enable_storage().unwrap();
1694
1695 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1696 let room = client.get_room(room_id).unwrap();
1697
1698 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1699
1700 let timeline = Timeline {
1702 limited: true,
1703 prev_batch: Some("raclette".to_owned()),
1704 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1705 };
1706
1707 room_event_cache
1708 .inner
1709 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1710 .await
1711 .unwrap();
1712
1713 let linked_chunk =
1714 from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1715 .unwrap()
1716 .unwrap();
1717
1718 assert_eq!(linked_chunk.chunks().count(), 2);
1719
1720 let mut chunks = linked_chunk.chunks();
1721
1722 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1724 assert_eq!(gap.prev_token, "raclette");
1725 });
1726
1727 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1729 assert_eq!(events.len(), 1);
1730 let deserialized = events[0].raw().deserialize().unwrap();
1731 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1732 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1733 });
1734
1735 assert!(chunks.next().is_none());
1737 }
1738
1739 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1741 async fn test_write_to_storage_strips_bundled_relations() {
1742 use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1743 use ruma::events::BundledMessageLikeRelations;
1744
1745 let room_id = room_id!("!galette:saucisse.bzh");
1746 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1747
1748 let event_cache_store = Arc::new(MemoryStore::new());
1749
1750 let client = MockClientBuilder::new("http://localhost".to_owned())
1751 .store_config(
1752 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1753 )
1754 .build()
1755 .await;
1756
1757 let event_cache = client.event_cache();
1758
1759 event_cache.subscribe().unwrap();
1761 event_cache.enable_storage().unwrap();
1762
1763 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1764 let room = client.get_room(room_id).unwrap();
1765
1766 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1767
1768 let mut relations = BundledMessageLikeRelations::new();
1770 relations.replace =
1771 Some(Box::new(f.text_msg("Hello, Kind Sir").sender(*ALICE).into_raw_sync()));
1772 let ev = f.text_msg("hey yo").sender(*ALICE).bundled_relations(relations).into_event();
1773
1774 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1775
1776 room_event_cache
1777 .inner
1778 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1779 .await
1780 .unwrap();
1781
1782 {
1784 let (events, _) = room_event_cache.subscribe().await;
1785
1786 assert_eq!(events.len(), 1);
1787
1788 let ev = events[0].raw().deserialize().unwrap();
1789 assert_let!(
1790 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1791 );
1792
1793 let original = msg.as_original().unwrap();
1794 assert_eq!(original.content.body(), "hey yo");
1795 assert!(original.unsigned.relations.replace.is_some());
1796 }
1797
1798 let linked_chunk =
1800 from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1801 .unwrap()
1802 .unwrap();
1803
1804 assert_eq!(linked_chunk.chunks().count(), 1);
1805
1806 let mut chunks = linked_chunk.chunks();
1807 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1808 assert_eq!(events.len(), 1);
1809
1810 let ev = events[0].raw().deserialize().unwrap();
1811 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1812
1813 let original = msg.as_original().unwrap();
1814 assert_eq!(original.content.body(), "hey yo");
1815 assert!(original.unsigned.relations.replace.is_none());
1816 });
1817
1818 assert!(chunks.next().is_none());
1820 }
1821
1822 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1824 async fn test_clear() {
1825 use eyeball_im::VectorDiff;
1826 use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1827
1828 use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
1829
1830 let room_id = room_id!("!galette:saucisse.bzh");
1831 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1832
1833 let event_cache_store = Arc::new(MemoryStore::new());
1834
1835 let event_id1 = event_id!("$1");
1836 let event_id2 = event_id!("$2");
1837
1838 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1839 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1840
1841 event_cache_store
1843 .handle_linked_chunk_updates(
1844 room_id,
1845 vec![
1846 Update::NewItemsChunk {
1848 previous: None,
1849 new: ChunkIdentifier::new(0),
1850 next: None,
1851 },
1852 Update::NewGapChunk {
1854 previous: Some(ChunkIdentifier::new(0)),
1855 new: ChunkIdentifier::new(42),
1857 next: None,
1858 gap: Gap { prev_token: "comté".to_owned() },
1859 },
1860 Update::NewItemsChunk {
1862 previous: Some(ChunkIdentifier::new(42)),
1863 new: ChunkIdentifier::new(1),
1864 next: None,
1865 },
1866 Update::PushItems {
1867 at: Position::new(ChunkIdentifier::new(1), 0),
1868 items: vec![ev1.clone()],
1869 },
1870 Update::NewItemsChunk {
1872 previous: Some(ChunkIdentifier::new(1)),
1873 new: ChunkIdentifier::new(2),
1874 next: None,
1875 },
1876 Update::PushItems {
1877 at: Position::new(ChunkIdentifier::new(2), 0),
1878 items: vec![ev2.clone()],
1879 },
1880 ],
1881 )
1882 .await
1883 .unwrap();
1884
1885 let client = MockClientBuilder::new("http://localhost".to_owned())
1886 .store_config(
1887 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1888 )
1889 .build()
1890 .await;
1891
1892 let event_cache = client.event_cache();
1893
1894 event_cache.subscribe().unwrap();
1896 event_cache.enable_storage().unwrap();
1897
1898 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1899 let room = client.get_room(room_id).unwrap();
1900
1901 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1902
1903 let (items, mut stream) = room_event_cache.subscribe().await;
1904
1905 {
1907 assert!(room_event_cache.event(event_id1).await.is_some());
1908 assert!(room_event_cache.event(event_id2).await.is_some());
1909 }
1910
1911 {
1913 assert_eq!(items.len(), 1);
1915 assert_eq!(items[0].event_id().unwrap(), event_id2);
1916
1917 assert!(stream.is_empty());
1918 }
1919
1920 {
1922 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1923
1924 assert_let_timeout!(
1925 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1926 );
1927 assert_eq!(diffs.len(), 1);
1928 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1929 assert_eq!(event.event_id().unwrap(), event_id1);
1931 });
1932
1933 assert!(stream.is_empty());
1934 }
1935
1936 room_event_cache.clear().await.unwrap();
1938
1939 assert_let_timeout!(
1941 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1942 );
1943 assert_eq!(diffs.len(), 1);
1944 assert_let!(VectorDiff::Clear = &diffs[0]);
1945
1946 assert!(room_event_cache.event(event_id1).await.is_none());
1948
1949 let (items, _) = room_event_cache.subscribe().await;
1950 assert!(items.is_empty());
1951
1952 let linked_chunk =
1954 from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1955 .unwrap()
1956 .unwrap();
1957
1958 assert_eq!(linked_chunk.num_items(), 0);
1962 }
1963
1964 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1966 async fn test_load_from_storage() {
1967 use eyeball_im::VectorDiff;
1968
1969 use super::RoomEventCacheUpdate;
1970 use crate::assert_let_timeout;
1971
1972 let room_id = room_id!("!galette:saucisse.bzh");
1973 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1974
1975 let event_cache_store = Arc::new(MemoryStore::new());
1976
1977 let event_id1 = event_id!("$1");
1978 let event_id2 = event_id!("$2");
1979
1980 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1981 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1982
1983 event_cache_store
1985 .handle_linked_chunk_updates(
1986 room_id,
1987 vec![
1988 Update::NewItemsChunk {
1990 previous: None,
1991 new: ChunkIdentifier::new(0),
1992 next: None,
1993 },
1994 Update::NewGapChunk {
1996 previous: Some(ChunkIdentifier::new(0)),
1997 new: ChunkIdentifier::new(42),
1999 next: None,
2000 gap: Gap { prev_token: "cheddar".to_owned() },
2001 },
2002 Update::NewItemsChunk {
2004 previous: Some(ChunkIdentifier::new(42)),
2005 new: ChunkIdentifier::new(1),
2006 next: None,
2007 },
2008 Update::PushItems {
2009 at: Position::new(ChunkIdentifier::new(1), 0),
2010 items: vec![ev1.clone()],
2011 },
2012 Update::NewItemsChunk {
2014 previous: Some(ChunkIdentifier::new(1)),
2015 new: ChunkIdentifier::new(2),
2016 next: None,
2017 },
2018 Update::PushItems {
2019 at: Position::new(ChunkIdentifier::new(2), 0),
2020 items: vec![ev2.clone()],
2021 },
2022 ],
2023 )
2024 .await
2025 .unwrap();
2026
2027 let client = MockClientBuilder::new("http://localhost".to_owned())
2028 .store_config(
2029 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
2030 )
2031 .build()
2032 .await;
2033
2034 let event_cache = client.event_cache();
2035
2036 event_cache.subscribe().unwrap();
2038 event_cache.enable_storage().unwrap();
2039
2040 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2041 let room = client.get_room(room_id).unwrap();
2042
2043 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2044
2045 let (items, mut stream) = room_event_cache.subscribe().await;
2046
2047 assert_eq!(items.len(), 1);
2050 assert_eq!(items[0].event_id().unwrap(), event_id2);
2051 assert!(stream.is_empty());
2052
2053 assert!(room_event_cache.event(event_id1).await.is_some());
2055 assert!(room_event_cache.event(event_id2).await.is_some());
2056
2057 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2059
2060 assert_let_timeout!(
2061 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2062 );
2063 assert_eq!(diffs.len(), 1);
2064 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2065 assert_eq!(event.event_id().unwrap(), event_id1);
2066 });
2067
2068 assert!(stream.is_empty());
2069
2070 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2072 room_event_cache
2073 .inner
2074 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
2075 .await
2076 .unwrap();
2077
2078 let (items, _stream) = room_event_cache.subscribe().await;
2083 assert_eq!(items.len(), 2);
2084 assert_eq!(items[0].event_id().unwrap(), event_id1);
2085 assert_eq!(items[1].event_id().unwrap(), event_id2);
2086 }
2087
2088 #[cfg(not(target_arch = "wasm32"))] #[async_test]
2090 async fn test_load_from_storage_resilient_to_failure() {
2091 let room_id = room_id!("!fondue:patate.ch");
2092 let event_cache_store = Arc::new(MemoryStore::new());
2093
2094 let event = EventFactory::new()
2095 .room(room_id)
2096 .sender(user_id!("@ben:saucisse.bzh"))
2097 .text_msg("foo")
2098 .event_id(event_id!("$42"))
2099 .into_event();
2100
2101 event_cache_store
2103 .handle_linked_chunk_updates(
2104 room_id,
2105 vec![
2106 Update::NewItemsChunk {
2107 previous: None,
2108 new: ChunkIdentifier::new(0),
2109 next: None,
2110 },
2111 Update::PushItems {
2112 at: Position::new(ChunkIdentifier::new(0), 0),
2113 items: vec![event],
2114 },
2115 Update::NewItemsChunk {
2116 previous: Some(ChunkIdentifier::new(0)),
2117 new: ChunkIdentifier::new(1),
2118 next: Some(ChunkIdentifier::new(0)),
2119 },
2120 ],
2121 )
2122 .await
2123 .unwrap();
2124
2125 let client = MockClientBuilder::new("http://localhost".to_owned())
2126 .store_config(
2127 StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
2128 )
2129 .build()
2130 .await;
2131
2132 let event_cache = client.event_cache();
2133
2134 event_cache.subscribe().unwrap();
2136 event_cache.enable_storage().unwrap();
2137
2138 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2139 let room = client.get_room(room_id).unwrap();
2140
2141 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2142
2143 let (items, _stream) = room_event_cache.subscribe().await;
2144
2145 assert!(items.is_empty());
2148
2149 let raw_chunks = event_cache_store.load_all_chunks(room_id).await.unwrap();
2152 assert!(raw_chunks.is_empty());
2153 }
2154
2155 #[cfg(not(target_arch = "wasm32"))] #[async_test]
2157 async fn test_no_useless_gaps() {
2158 use crate::event_cache::room::LoadMoreEventsBackwardsOutcome;
2159
2160 let room_id = room_id!("!galette:saucisse.bzh");
2161
2162 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2163
2164 let event_cache = client.event_cache();
2165 event_cache.subscribe().unwrap();
2166
2167 let has_storage = true; event_cache.enable_storage().unwrap();
2169
2170 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2171 let room = client.get_room(room_id).unwrap();
2172 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2173
2174 let f = EventFactory::new().room(room_id).sender(*ALICE);
2175
2176 room_event_cache
2179 .inner
2180 .handle_joined_room_update(
2181 has_storage,
2182 JoinedRoomUpdate {
2183 timeline: Timeline {
2184 limited: true,
2185 prev_batch: Some("raclette".to_owned()),
2186 events: vec![f.text_msg("hey yo").into_event()],
2187 },
2188 ..Default::default()
2189 },
2190 )
2191 .await
2192 .unwrap();
2193
2194 {
2195 let mut state = room_event_cache.inner.state.write().await;
2196
2197 let mut num_gaps = 0;
2198 let mut num_events = 0;
2199
2200 for c in state.events().chunks() {
2201 match c.content() {
2202 ChunkContent::Items(items) => num_events += items.len(),
2203 ChunkContent::Gap(_) => num_gaps += 1,
2204 }
2205 }
2206
2207 assert_eq!(num_gaps, 0);
2210 assert_eq!(num_events, 1);
2211
2212 assert_matches!(
2214 state.load_more_events_backwards().await.unwrap(),
2215 LoadMoreEventsBackwardsOutcome::Gap { .. }
2216 );
2217
2218 num_gaps = 0;
2219 num_events = 0;
2220 for c in state.events().chunks() {
2221 match c.content() {
2222 ChunkContent::Items(items) => num_events += items.len(),
2223 ChunkContent::Gap(_) => num_gaps += 1,
2224 }
2225 }
2226
2227 assert_eq!(num_gaps, 1);
2229 assert_eq!(num_events, 1);
2230 }
2231
2232 room_event_cache
2235 .inner
2236 .handle_joined_room_update(
2237 has_storage,
2238 JoinedRoomUpdate {
2239 timeline: Timeline {
2240 limited: false,
2241 prev_batch: Some("fondue".to_owned()),
2242 events: vec![f.text_msg("sup").into_event()],
2243 },
2244 ..Default::default()
2245 },
2246 )
2247 .await
2248 .unwrap();
2249
2250 {
2251 let state = room_event_cache.inner.state.read().await;
2252
2253 let mut num_gaps = 0;
2254 let mut num_events = 0;
2255
2256 for c in state.events().chunks() {
2257 match c.content() {
2258 ChunkContent::Items(items) => num_events += items.len(),
2259 ChunkContent::Gap(gap) => {
2260 assert_eq!(gap.prev_token, "raclette");
2261 num_gaps += 1;
2262 }
2263 }
2264 }
2265
2266 assert_eq!(num_gaps, 1);
2268 assert_eq!(num_events, 2);
2269 }
2270 }
2271
2272 async fn assert_relations(
2273 room_id: &RoomId,
2274 original_event: TimelineEvent,
2275 related_event: TimelineEvent,
2276 event_factory: EventFactory,
2277 ) {
2278 let client = logged_in_client(None).await;
2279
2280 let event_cache = client.event_cache();
2281 event_cache.subscribe().unwrap();
2282
2283 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2284 let room = client.get_room(room_id).unwrap();
2285
2286 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2287
2288 let original_event_id = original_event.event_id().unwrap();
2290 room_event_cache.save_event(original_event).await;
2291
2292 let unrelated_id = event_id!("$2");
2294 room_event_cache
2295 .save_event(event_factory.text_msg("An unrelated event").event_id(unrelated_id).into())
2296 .await;
2297
2298 let related_id = related_event.event_id().unwrap();
2300 room_event_cache.save_event(related_event).await;
2301
2302 let (event, related_events) =
2303 room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
2304 let cached_event_id = event.event_id().unwrap();
2306 assert_eq!(cached_event_id, original_event_id);
2307
2308 assert_eq!(related_events.len(), 1);
2310 let related_event_id = related_events[0].event_id().unwrap();
2311 assert_eq!(related_event_id, related_id);
2312 }
2313
2314 #[cfg(not(target_arch = "wasm32"))] #[async_test]
2316 async fn test_shrink_to_last_chunk() {
2317 use eyeball_im::VectorDiff;
2318
2319 use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
2320
2321 let room_id = room_id!("!galette:saucisse.bzh");
2322
2323 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2324
2325 let f = EventFactory::new().room(room_id);
2326
2327 let evid1 = event_id!("$1");
2328 let evid2 = event_id!("$2");
2329
2330 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2331 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2332
2333 {
2335 let store = client.event_cache_store();
2336 let store = store.lock().await.unwrap();
2337 store
2338 .handle_linked_chunk_updates(
2339 room_id,
2340 vec![
2341 Update::NewItemsChunk {
2342 previous: None,
2343 new: ChunkIdentifier::new(0),
2344 next: None,
2345 },
2346 Update::PushItems {
2347 at: Position::new(ChunkIdentifier::new(0), 0),
2348 items: vec![ev1],
2349 },
2350 Update::NewItemsChunk {
2351 previous: Some(ChunkIdentifier::new(0)),
2352 new: ChunkIdentifier::new(1),
2353 next: None,
2354 },
2355 Update::PushItems {
2356 at: Position::new(ChunkIdentifier::new(1), 0),
2357 items: vec![ev2],
2358 },
2359 ],
2360 )
2361 .await
2362 .unwrap();
2363 }
2364
2365 let event_cache = client.event_cache();
2366 event_cache.subscribe().unwrap();
2367 event_cache.enable_storage().unwrap();
2368
2369 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2370 let room = client.get_room(room_id).unwrap();
2371 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2372
2373 let (events, mut stream) = room_event_cache.subscribe().await;
2375 assert_eq!(events.len(), 1);
2376 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2377 assert!(stream.is_empty());
2378
2379 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2381 assert_eq!(outcome.events.len(), 1);
2382 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2383 assert!(outcome.reached_start);
2384
2385 assert_let_timeout!(
2387 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2388 );
2389 assert_eq!(diffs.len(), 1);
2390 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2391 assert_eq!(value.event_id().as_deref(), Some(evid1));
2392 });
2393
2394 assert!(stream.is_empty());
2395
2396 let diffs = room_event_cache
2398 .inner
2399 .state
2400 .write()
2401 .await
2402 .shrink_to_last_chunk()
2403 .await
2404 .expect("shrinking should succeed")
2405 .unwrap();
2406
2407 assert_eq!(diffs.len(), 2);
2409 assert_matches!(&diffs[0], VectorDiff::Clear);
2410 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
2411 assert_eq!(values.len(), 1);
2412 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
2413 });
2414
2415 assert!(stream.is_empty());
2416
2417 let (events, _) = room_event_cache.subscribe().await;
2419 assert_eq!(events.len(), 1);
2420 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2421
2422 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2425 assert_eq!(outcome.events.len(), 1);
2426 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2427 assert!(outcome.reached_start);
2428 }
2429
2430 #[cfg(not(target_arch = "wasm32"))] #[async_test]
2432 async fn test_auto_shrink_after_all_subscribers_are_gone() {
2433 use eyeball_im::VectorDiff;
2434 use tokio::task::yield_now;
2435
2436 use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
2437
2438 let room_id = room_id!("!galette:saucisse.bzh");
2439
2440 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2441
2442 let f = EventFactory::new().room(room_id);
2443
2444 let evid1 = event_id!("$1");
2445 let evid2 = event_id!("$2");
2446
2447 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2448 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2449
2450 {
2452 let store = client.event_cache_store();
2453 let store = store.lock().await.unwrap();
2454 store
2455 .handle_linked_chunk_updates(
2456 room_id,
2457 vec![
2458 Update::NewItemsChunk {
2459 previous: None,
2460 new: ChunkIdentifier::new(0),
2461 next: None,
2462 },
2463 Update::PushItems {
2464 at: Position::new(ChunkIdentifier::new(0), 0),
2465 items: vec![ev1],
2466 },
2467 Update::NewItemsChunk {
2468 previous: Some(ChunkIdentifier::new(0)),
2469 new: ChunkIdentifier::new(1),
2470 next: None,
2471 },
2472 Update::PushItems {
2473 at: Position::new(ChunkIdentifier::new(1), 0),
2474 items: vec![ev2],
2475 },
2476 ],
2477 )
2478 .await
2479 .unwrap();
2480 }
2481
2482 let event_cache = client.event_cache();
2483 event_cache.subscribe().unwrap();
2484 event_cache.enable_storage().unwrap();
2485
2486 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2487 let room = client.get_room(room_id).unwrap();
2488 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2489
2490 let (events1, mut stream1) = room_event_cache.subscribe().await;
2492 assert_eq!(events1.len(), 1);
2493 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
2494 assert!(stream1.is_empty());
2495
2496 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2498 assert_eq!(outcome.events.len(), 1);
2499 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2500 assert!(outcome.reached_start);
2501
2502 assert_let_timeout!(
2505 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
2506 );
2507 assert_eq!(diffs.len(), 1);
2508 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2509 assert_eq!(value.event_id().as_deref(), Some(evid1));
2510 });
2511
2512 assert!(stream1.is_empty());
2513
2514 let (events2, stream2) = room_event_cache.subscribe().await;
2518 assert_eq!(events2.len(), 2);
2519 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
2520 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
2521 assert!(stream2.is_empty());
2522
2523 drop(stream1);
2525 yield_now().await;
2526
2527 assert!(stream2.is_empty());
2529
2530 drop(stream2);
2532 yield_now().await;
2533
2534 {
2537 let state = room_event_cache.inner.state.read().await;
2539 assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0);
2540 }
2541
2542 let (events3, _stream2) = room_event_cache.subscribe().await;
2544 assert_eq!(events3.len(), 1);
2545 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2546 }
2547}