1use std::{collections::BTreeMap, fmt, sync::Arc};
18
19use events::Gap;
20use eyeball_im::VectorDiff;
21use matrix_sdk_base::{
22 deserialized_responses::{AmbiguityChange, TimelineEvent},
23 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
24};
25use ruma::{
26 events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
27 serde::Raw,
28 EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
29};
30use tokio::sync::{
31 broadcast::{Receiver, Sender},
32 Notify, RwLock,
33};
34use tracing::{trace, warn};
35
36use super::{
37 paginator::{Paginator, PaginatorState},
38 AllEventsCache, EventsOrigin, Result, RoomEventCacheUpdate, RoomPagination,
39};
40use crate::{client::WeakClient, room::WeakRoom};
41
42pub(super) mod events;
43
44#[derive(Clone)]
48pub struct RoomEventCache {
49 pub(super) inner: Arc<RoomEventCacheInner>,
50}
51
52impl fmt::Debug for RoomEventCache {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("RoomEventCache").finish_non_exhaustive()
55 }
56}
57
58impl RoomEventCache {
59 pub(super) fn new(
61 client: WeakClient,
62 state: RoomEventCacheState,
63 room_id: OwnedRoomId,
64 room_version: RoomVersionId,
65 all_events_cache: Arc<RwLock<AllEventsCache>>,
66 ) -> Self {
67 Self {
68 inner: Arc::new(RoomEventCacheInner::new(
69 client,
70 state,
71 room_id,
72 room_version,
73 all_events_cache,
74 )),
75 }
76 }
77
78 pub async fn subscribe(&self) -> (Vec<TimelineEvent>, Receiver<RoomEventCacheUpdate>) {
81 let state = self.inner.state.read().await;
82 let events = state.events().events().map(|(_position, item)| item.clone()).collect();
83
84 (events, self.inner.sender.subscribe())
85 }
86
87 pub fn pagination(&self) -> RoomPagination {
90 RoomPagination { inner: self.inner.clone() }
91 }
92
93 pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
95 if let Some((room_id, event)) =
96 self.inner.all_events.read().await.events.get(event_id).cloned()
97 {
98 if room_id == self.inner.room_id {
99 return Some(event);
100 }
101 }
102
103 let state = self.inner.state.read().await;
104 for (_pos, event) in state.events().revents() {
105 if event.event_id().as_deref() == Some(event_id) {
106 return Some(event.clone());
107 }
108 }
109 None
110 }
111
112 pub async fn event_with_relations(
117 &self,
118 event_id: &EventId,
119 filter: Option<Vec<RelationType>>,
120 ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
121 let cache = self.inner.all_events.read().await;
122 if let Some((_, event)) = cache.events.get(event_id) {
123 let related_events = cache.collect_related_events(event_id, filter.as_deref());
124 Some((event.clone(), related_events))
125 } else {
126 None
127 }
128 }
129
130 pub async fn clear(&self) -> Result<()> {
135 let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
137
138 self.inner.all_events.write().await.clear();
140
141 let _ = self.inner.paginator.set_idle_state(PaginatorState::Initial, None, None);
144
145 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
147 diffs: updates_as_vector_diffs,
148 origin: EventsOrigin::Sync,
149 });
150
151 Ok(())
152 }
153
154 pub(crate) async fn save_event(&self, event: TimelineEvent) {
160 if let Some(event_id) = event.event_id() {
161 let mut cache = self.inner.all_events.write().await;
162
163 cache.append_related_event(&event);
164 cache.events.insert(event_id, (self.inner.room_id.clone(), event));
165 } else {
166 warn!("couldn't save event without event id in the event cache");
167 }
168 }
169
170 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
177 let mut cache = self.inner.all_events.write().await;
178 for event in events {
179 if let Some(event_id) = event.event_id() {
180 cache.append_related_event(&event);
181 cache.events.insert(event_id, (self.inner.room_id.clone(), event));
182 } else {
183 warn!("couldn't save event without event id in the event cache");
184 }
185 }
186 }
187
188 pub async fn debug_string(&self) -> Vec<String> {
191 self.inner.state.read().await.events().debug_string()
192 }
193}
194
195pub(super) struct RoomEventCacheInner {
197 room_id: OwnedRoomId,
199
200 pub(crate) room_version: RoomVersionId,
202
203 pub sender: Sender<RoomEventCacheUpdate>,
205
206 pub state: RwLock<RoomEventCacheState>,
208
209 all_events: Arc<RwLock<AllEventsCache>>,
214
215 pub pagination_batch_token_notifier: Notify,
217
218 pub paginator: Paginator<WeakRoom>,
227}
228
229impl RoomEventCacheInner {
230 fn new(
233 client: WeakClient,
234 state: RoomEventCacheState,
235 room_id: OwnedRoomId,
236 room_version: RoomVersionId,
237 all_events_cache: Arc<RwLock<AllEventsCache>>,
238 ) -> Self {
239 let sender = Sender::new(32);
240 let weak_room = WeakRoom::new(client, room_id);
241 Self {
242 room_id: weak_room.room_id().to_owned(),
243 room_version,
244 state: RwLock::new(state),
245 all_events: all_events_cache,
246 sender,
247 pagination_batch_token_notifier: Default::default(),
248 paginator: Paginator::new(weak_room),
249 }
250 }
251
252 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
253 if account_data.is_empty() {
254 return;
255 }
256
257 let mut handled_read_marker = false;
258
259 trace!("Handling account data");
260
261 for raw_event in account_data {
262 match raw_event.deserialize() {
263 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
264 if handled_read_marker {
267 continue;
268 }
269
270 handled_read_marker = true;
271
272 let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
274 event_id: ev.content.event_id,
275 });
276 }
277
278 Ok(_) => {
279 }
282
283 Err(e) => {
284 let event_type = raw_event.get_field::<String>("type").ok().flatten();
285 warn!(event_type, "Failed to deserialize account data: {e}");
286 }
287 }
288 }
289 }
290
291 pub(super) async fn handle_joined_room_update(
292 &self,
293 has_storage: bool,
294 updates: JoinedRoomUpdate,
295 ) -> Result<()> {
296 self.handle_timeline(
297 has_storage,
298 updates.timeline,
299 updates.ephemeral.clone(),
300 updates.ambiguity_changes,
301 )
302 .await?;
303
304 self.handle_account_data(updates.account_data);
305
306 Ok(())
307 }
308
309 async fn handle_timeline(
310 &self,
311 has_storage: bool,
312 timeline: Timeline,
313 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
314 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
315 ) -> Result<()> {
316 if !has_storage && timeline.limited {
317 trace!("limited timeline, clearing all previous events and pushing new events");
321
322 self.replace_all_events_by(
323 timeline.events,
324 timeline.prev_batch,
325 ephemeral_events,
326 ambiguity_changes,
327 )
328 .await?;
329 } else {
330 trace!("adding new events");
332
333 let prev_batch =
341 if has_storage && !timeline.limited { None } else { timeline.prev_batch };
342
343 let mut state = self.state.write().await;
344 self.append_events_locked(
345 &mut state,
346 timeline.events,
347 prev_batch,
348 ephemeral_events,
349 ambiguity_changes,
350 )
351 .await?;
352 }
353
354 Ok(())
355 }
356
357 pub(super) async fn handle_left_room_update(
358 &self,
359 has_storage: bool,
360 updates: LeftRoomUpdate,
361 ) -> Result<()> {
362 self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes)
363 .await?;
364 Ok(())
365 }
366
367 pub(super) async fn replace_all_events_by(
370 &self,
371 sync_timeline_events: Vec<TimelineEvent>,
372 prev_batch: Option<String>,
373 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
374 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
375 ) -> Result<()> {
376 let mut state = self.state.write().await;
378
379 let updates_as_vector_diffs = state.reset().await?;
381
382 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
384 diffs: updates_as_vector_diffs,
385 origin: EventsOrigin::Sync,
386 });
387
388 self.append_events_locked(
390 &mut state,
391 sync_timeline_events,
392 prev_batch.clone(),
393 ephemeral_events,
394 ambiguity_changes,
395 )
396 .await?;
397
398 self.paginator.set_idle_state(PaginatorState::Initial, prev_batch, None)?;
400
401 Ok(())
402 }
403
404 async fn append_events_locked(
409 &self,
410 state: &mut RoomEventCacheState,
411 sync_timeline_events: Vec<TimelineEvent>,
412 prev_batch: Option<String>,
413 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
414 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
415 ) -> Result<()> {
416 if sync_timeline_events.is_empty()
417 && prev_batch.is_none()
418 && ephemeral_events.is_empty()
419 && ambiguity_changes.is_empty()
420 {
421 return Ok(());
422 }
423
424 let (events, duplicated_event_ids, all_duplicates) =
425 state.collect_valid_and_duplicated_events(sync_timeline_events.clone()).await?;
426
427 let sync_timeline_events_diffs = if all_duplicates {
433 vec![]
435 } else {
436 let (_, sync_timeline_events_diffs) = state
439 .with_events_mut(|room_events| {
440 if let Some(prev_token) = &prev_batch {
441 room_events.push_gap(Gap { prev_token: prev_token.clone() });
442 }
443
444 room_events.remove_events_by_id(duplicated_event_ids);
450
451 room_events.push_events(events.clone());
453
454 room_events.on_new_events(&self.room_version, events.iter());
455 })
456 .await?;
457
458 {
459 let mut all_events = self.all_events.write().await;
461 for sync_timeline_event in sync_timeline_events {
462 if let Some(event_id) = sync_timeline_event.event_id() {
463 all_events.append_related_event(&sync_timeline_event);
464 all_events.events.insert(
465 event_id.to_owned(),
466 (self.room_id.clone(), sync_timeline_event),
467 );
468 }
469 }
470 }
471
472 sync_timeline_events_diffs
473 };
474
475 if prev_batch.is_some() {
478 self.pagination_batch_token_notifier.notify_one();
479 }
480
481 {
483 if !sync_timeline_events_diffs.is_empty() {
484 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
485 diffs: sync_timeline_events_diffs,
486 origin: EventsOrigin::Sync,
487 });
488 }
489
490 if !ephemeral_events.is_empty() {
491 let _ = self
492 .sender
493 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
494 }
495
496 if !ambiguity_changes.is_empty() {
497 let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
498 }
499 }
500
501 Ok(())
502 }
503}
504
505#[derive(Debug)]
508pub(super) enum LoadMoreEventsBackwardsOutcome {
509 Gap,
511
512 StartOfTimeline,
514
515 Events(Vec<TimelineEvent>, Vec<VectorDiff<TimelineEvent>>),
517}
518
519mod private {
521 use std::sync::Arc;
522
523 use eyeball_im::VectorDiff;
524 use matrix_sdk_base::{
525 deserialized_responses::{TimelineEvent, TimelineEventKind},
526 event_cache::{store::EventCacheStoreLock, Event},
527 linked_chunk::{lazy_loader, ChunkContent, Update},
528 };
529 use matrix_sdk_common::executor::spawn;
530 use once_cell::sync::OnceCell;
531 use ruma::{serde::Raw, OwnedEventId, OwnedRoomId};
532 use tracing::{error, instrument, trace};
533
534 use super::{events::RoomEvents, LoadMoreEventsBackwardsOutcome};
535 use crate::event_cache::{deduplicator::Deduplicator, EventCacheError};
536
537 pub struct RoomEventCacheState {
542 room: OwnedRoomId,
544
545 store: Arc<OnceCell<EventCacheStoreLock>>,
550
551 events: RoomEvents,
553
554 deduplicator: Deduplicator,
556
557 pub waited_for_initial_prev_token: bool,
562 }
563
564 impl RoomEventCacheState {
565 pub async fn new(
575 room_id: OwnedRoomId,
576 store: Arc<OnceCell<EventCacheStoreLock>>,
577 ) -> Result<Self, EventCacheError> {
578 let (events, deduplicator) = if let Some(store) = store.get() {
579 let store_lock = store.lock().await?;
580
581 let linked_chunk = match store_lock
582 .load_last_chunk(&room_id)
583 .await
584 .map_err(EventCacheError::from)
585 .and_then(|(last_chunk, chunk_identifier_generator)| {
586 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
587 .map_err(EventCacheError::from)
588 }) {
589 Ok(linked_chunk) => linked_chunk,
590
591 Err(err) => {
592 error!("error when reloading a linked chunk from memory: {err}");
593
594 store_lock
596 .handle_linked_chunk_updates(&room_id, vec![Update::Clear])
597 .await?;
598
599 None
601 }
602 };
603
604 (
605 RoomEvents::with_initial_linked_chunk(linked_chunk),
606 Deduplicator::new_store_based(room_id.clone(), store.clone()),
607 )
608 } else {
609 (RoomEvents::default(), Deduplicator::new_memory_based())
610 };
611
612 Ok(Self {
613 room: room_id,
614 store,
615 events,
616 deduplicator,
617 waited_for_initial_prev_token: false,
618 })
619 }
620
621 pub async fn collect_valid_and_duplicated_events(
648 &mut self,
649 events: Vec<Event>,
650 ) -> Result<(Vec<Event>, Vec<OwnedEventId>, bool), EventCacheError> {
651 let (events, duplicated_event_ids) =
652 self.deduplicator.filter_duplicate_events(events, &self.events).await?;
653
654 let all_duplicates = !events.is_empty() && events.len() == duplicated_event_ids.len();
655
656 Ok((events, duplicated_event_ids, all_duplicates))
657 }
658
659 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
661 pub(in super::super) async fn load_more_events_backwards(
662 &mut self,
663 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
664 let Some(store) = self.store.get() else {
665 return Ok(LoadMoreEventsBackwardsOutcome::Gap);
668 };
669
670 if self.events.chunks().any(|chunk| chunk.is_gap()) {
673 return Ok(LoadMoreEventsBackwardsOutcome::Gap);
674 }
675
676 let first_chunk_identifier =
679 self.events.chunks().next().expect("a linked chunk is never empty").identifier();
680
681 let room_id = &self.room;
682 let store = store.lock().await?;
683
684 let new_first_chunk =
686 match store.load_previous_chunk(room_id, first_chunk_identifier).await {
687 Ok(Some(new_first_chunk)) => {
688 new_first_chunk
690 }
691 Ok(None) => {
692 return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
695 }
696 Err(err) => {
697 error!("error when loading the previous chunk of a linked chunk: {err}");
698
699 store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?;
701
702 return Err(err.into());
704 }
705 };
706
707 let events = match &new_first_chunk.content {
708 ChunkContent::Gap(_) => None,
709 ChunkContent::Items(events) => Some(events.clone()),
710 };
711
712 if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
713 error!("error when inserting the previous chunk into its linked chunk: {err}");
714
715 store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?;
717
718 return Err(err.into());
720 };
721
722 let _ = self.events.updates().take();
725
726 let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
728
729 Ok(match events {
730 None => LoadMoreEventsBackwardsOutcome::Gap,
731 Some(events) => {
732 LoadMoreEventsBackwardsOutcome::Events(events, updates_as_vector_diffs)
733 }
734 })
735 }
736 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
740 let mut closure = || -> Option<()> {
744 let mut val: serde_json::Value = event.deserialize_as().ok()?;
745 let unsigned = val.get_mut("unsigned")?;
746 let unsigned_obj = unsigned.as_object_mut()?;
747 if unsigned_obj.remove("m.relations").is_some() {
748 *event = Raw::new(&val).ok()?.cast();
749 }
750 None
751 };
752 let _ = closure();
753 }
754
755 fn strip_relations_from_event(ev: &mut TimelineEvent) {
756 match &mut ev.kind {
757 TimelineEventKind::Decrypted(decrypted) => {
758 decrypted.unsigned_encryption_info = None;
761
762 Self::strip_relations_if_present(&mut decrypted.event);
764 }
765
766 TimelineEventKind::UnableToDecrypt { event, .. }
767 | TimelineEventKind::PlainText { event } => {
768 Self::strip_relations_if_present(event);
769 }
770 }
771 }
772
773 fn strip_relations_from_events(items: &mut [TimelineEvent]) {
775 for ev in items.iter_mut() {
776 Self::strip_relations_from_event(ev);
777 }
778 }
779
780 #[instrument(skip_all)]
782 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
783 let mut updates = self.events.updates().take();
784
785 if updates.is_empty() {
786 return Ok(());
787 }
788
789 let Some(store) = self.store.get() else {
790 return Ok(());
791 };
792
793 trace!("propagating {} updates", updates.len());
794
795 for update in updates.iter_mut() {
797 match update {
798 Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
799 Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
800 Update::NewItemsChunk { .. }
802 | Update::NewGapChunk { .. }
803 | Update::RemoveChunk(_)
804 | Update::RemoveItem { .. }
805 | Update::DetachLastItems { .. }
806 | Update::StartReattachItems
807 | Update::EndReattachItems
808 | Update::Clear => {}
809 }
810 }
811
812 let store = store.clone();
819 let room_id = self.room.clone();
820
821 spawn(async move {
822 let store = store.lock().await?;
823
824 if let Err(err) = store.handle_linked_chunk_updates(&room_id, updates).await {
825 error!("unable to handle linked chunk updates: {err}");
826 }
827
828 super::Result::Ok(())
829 })
830 .await
831 .expect("joining failed")?;
832
833 trace!("done propagating store changes");
834
835 Ok(())
836 }
837
838 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
840 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
841 self.events.reset();
842 self.propagate_changes().await?;
843 self.waited_for_initial_prev_token = false;
844
845 Ok(self.events.updates_as_vector_diffs())
846 }
847
848 pub fn events(&self) -> &RoomEvents {
850 &self.events
851 }
852
853 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
860 pub async fn with_events_mut<O, F: FnOnce(&mut RoomEvents) -> O>(
861 &mut self,
862 func: F,
863 ) -> Result<(O, Vec<VectorDiff<TimelineEvent>>), EventCacheError> {
864 let output = func(&mut self.events);
865 self.propagate_changes().await?;
866 let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
867 Ok((output, updates_as_vector_diffs))
868 }
869 }
870}
871
872pub(super) use private::RoomEventCacheState;
873
874#[cfg(test)]
875mod tests {
876 use std::sync::Arc;
877
878 use assert_matches::assert_matches;
879 use assert_matches2::assert_let;
880 use matrix_sdk_base::{
881 event_cache::{
882 store::{EventCacheStore as _, MemoryStore},
883 Gap,
884 },
885 linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
886 store::StoreConfig,
887 sync::{JoinedRoomUpdate, Timeline},
888 };
889 use matrix_sdk_common::deserialized_responses::TimelineEvent;
890 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
891 use ruma::{
892 event_id,
893 events::{
894 relation::RelationType, room::message::RoomMessageEventContentWithoutRelation,
895 AnySyncMessageLikeEvent, AnySyncTimelineEvent,
896 },
897 room_id, user_id, RoomId,
898 };
899
900 use crate::test_utils::{client::MockClientBuilder, logged_in_client};
901
902 #[async_test]
903 async fn test_event_with_redaction_relation() {
904 let original_id = event_id!("$original");
905 let related_id = event_id!("$related");
906 let room_id = room_id!("!galette:saucisse.bzh");
907 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
908
909 assert_relations(
910 room_id,
911 f.text_msg("Original event").event_id(original_id).into(),
912 f.redaction(original_id).event_id(related_id).into(),
913 f,
914 )
915 .await;
916 }
917
918 #[async_test]
919 async fn test_event_with_edit_relation() {
920 let original_id = event_id!("$original");
921 let related_id = event_id!("$related");
922 let room_id = room_id!("!galette:saucisse.bzh");
923 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
924
925 assert_relations(
926 room_id,
927 f.text_msg("Original event").event_id(original_id).into(),
928 f.text_msg("* An edited event")
929 .edit(
930 original_id,
931 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
932 )
933 .event_id(related_id)
934 .into(),
935 f,
936 )
937 .await;
938 }
939
940 #[async_test]
941 async fn test_event_with_reply_relation() {
942 let original_id = event_id!("$original");
943 let related_id = event_id!("$related");
944 let room_id = room_id!("!galette:saucisse.bzh");
945 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
946
947 assert_relations(
948 room_id,
949 f.text_msg("Original event").event_id(original_id).into(),
950 f.text_msg("A reply").reply_to(original_id).event_id(related_id).into(),
951 f,
952 )
953 .await;
954 }
955
956 #[async_test]
957 async fn test_event_with_thread_reply_relation() {
958 let original_id = event_id!("$original");
959 let related_id = event_id!("$related");
960 let room_id = room_id!("!galette:saucisse.bzh");
961 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
962
963 assert_relations(
964 room_id,
965 f.text_msg("Original event").event_id(original_id).into(),
966 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
967 f,
968 )
969 .await;
970 }
971
972 #[async_test]
973 async fn test_event_with_reaction_relation() {
974 let original_id = event_id!("$original");
975 let related_id = event_id!("$related");
976 let room_id = room_id!("!galette:saucisse.bzh");
977 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
978
979 assert_relations(
980 room_id,
981 f.text_msg("Original event").event_id(original_id).into(),
982 f.reaction(original_id, ":D").event_id(related_id).into(),
983 f,
984 )
985 .await;
986 }
987
988 #[async_test]
989 async fn test_event_with_poll_response_relation() {
990 let original_id = event_id!("$original");
991 let related_id = event_id!("$related");
992 let room_id = room_id!("!galette:saucisse.bzh");
993 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
994
995 assert_relations(
996 room_id,
997 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
998 .event_id(original_id)
999 .into(),
1000 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
1001 f,
1002 )
1003 .await;
1004 }
1005
1006 #[async_test]
1007 async fn test_event_with_poll_end_relation() {
1008 let original_id = event_id!("$original");
1009 let related_id = event_id!("$related");
1010 let room_id = room_id!("!galette:saucisse.bzh");
1011 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1012
1013 assert_relations(
1014 room_id,
1015 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1016 .event_id(original_id)
1017 .into(),
1018 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
1019 f,
1020 )
1021 .await;
1022 }
1023
1024 #[async_test]
1025 async fn test_event_with_filtered_relationships() {
1026 let original_id = event_id!("$original");
1027 let related_id = event_id!("$related");
1028 let associated_related_id = event_id!("$recursive_related");
1029 let room_id = room_id!("!galette:saucisse.bzh");
1030 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1031
1032 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1033 let related_event = event_factory
1034 .text_msg("* Edited event")
1035 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1036 .event_id(related_id)
1037 .into();
1038 let associated_related_event =
1039 event_factory.redaction(related_id).event_id(associated_related_id).into();
1040
1041 let client = logged_in_client(None).await;
1042
1043 let event_cache = client.event_cache();
1044 event_cache.subscribe().unwrap();
1045
1046 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1047 let room = client.get_room(room_id).unwrap();
1048
1049 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1050
1051 room_event_cache.save_event(original_event).await;
1053
1054 room_event_cache.save_event(related_event).await;
1056
1057 room_event_cache.save_event(associated_related_event).await;
1059
1060 let filter = Some(vec![RelationType::Replacement]);
1061 let (event, related_events) =
1062 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1063 let cached_event_id = event.event_id().unwrap();
1065 assert_eq!(cached_event_id, original_id);
1066
1067 assert_eq!(related_events.len(), 2);
1069
1070 let related_event_id = related_events[0].event_id().unwrap();
1071 assert_eq!(related_event_id, related_id);
1072 let related_event_id = related_events[1].event_id().unwrap();
1073 assert_eq!(related_event_id, associated_related_id);
1074
1075 let filter = Some(vec![RelationType::Thread]);
1077 let (event, related_events) =
1078 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1079 let cached_event_id = event.event_id().unwrap();
1081 assert_eq!(cached_event_id, original_id);
1082 assert!(related_events.is_empty());
1084 }
1085
1086 #[async_test]
1087 async fn test_event_with_recursive_relation() {
1088 let original_id = event_id!("$original");
1089 let related_id = event_id!("$related");
1090 let associated_related_id = event_id!("$recursive_related");
1091 let room_id = room_id!("!galette:saucisse.bzh");
1092 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1093
1094 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1095 let related_event = event_factory
1096 .text_msg("* Edited event")
1097 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1098 .event_id(related_id)
1099 .into();
1100 let associated_related_event =
1101 event_factory.redaction(related_id).event_id(associated_related_id).into();
1102
1103 let client = logged_in_client(None).await;
1104
1105 let event_cache = client.event_cache();
1106 event_cache.subscribe().unwrap();
1107
1108 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1109 let room = client.get_room(room_id).unwrap();
1110
1111 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1112
1113 room_event_cache.save_event(original_event).await;
1115
1116 room_event_cache.save_event(related_event).await;
1118
1119 room_event_cache.save_event(associated_related_event).await;
1121
1122 let (event, related_events) =
1123 room_event_cache.event_with_relations(original_id, None).await.unwrap();
1124 let cached_event_id = event.event_id().unwrap();
1126 assert_eq!(cached_event_id, original_id);
1127
1128 assert_eq!(related_events.len(), 2);
1130
1131 let related_event_id = related_events[0].event_id().unwrap();
1132 assert_eq!(related_event_id, related_id);
1133 let related_event_id = related_events[1].event_id().unwrap();
1134 assert_eq!(related_event_id, associated_related_id);
1135 }
1136
1137 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1139 async fn test_write_to_storage() {
1140 use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1141
1142 let room_id = room_id!("!galette:saucisse.bzh");
1143 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1144
1145 let event_cache_store = Arc::new(MemoryStore::new());
1146
1147 let client = MockClientBuilder::new("http://localhost".to_owned())
1148 .store_config(
1149 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1150 )
1151 .build()
1152 .await;
1153
1154 let event_cache = client.event_cache();
1155
1156 event_cache.subscribe().unwrap();
1158 event_cache.enable_storage().unwrap();
1159
1160 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1161 let room = client.get_room(room_id).unwrap();
1162
1163 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1164
1165 let timeline = Timeline {
1167 limited: true,
1168 prev_batch: Some("raclette".to_owned()),
1169 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1170 };
1171
1172 room_event_cache
1173 .inner
1174 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1175 .await
1176 .unwrap();
1177
1178 let linked_chunk =
1179 from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1180 .unwrap()
1181 .unwrap();
1182
1183 assert_eq!(linked_chunk.chunks().count(), 3);
1184
1185 let mut chunks = linked_chunk.chunks();
1186
1187 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1189 assert_eq!(events.len(), 0)
1190 });
1191
1192 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1194 assert_eq!(gap.prev_token, "raclette");
1195 });
1196
1197 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1199 assert_eq!(events.len(), 1);
1200 let deserialized = events[0].raw().deserialize().unwrap();
1201 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1202 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1203 });
1204
1205 assert!(chunks.next().is_none());
1207 }
1208
1209 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1211 async fn test_write_to_storage_strips_bundled_relations() {
1212 use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1213 use ruma::events::BundledMessageLikeRelations;
1214
1215 let room_id = room_id!("!galette:saucisse.bzh");
1216 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1217
1218 let event_cache_store = Arc::new(MemoryStore::new());
1219
1220 let client = MockClientBuilder::new("http://localhost".to_owned())
1221 .store_config(
1222 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1223 )
1224 .build()
1225 .await;
1226
1227 let event_cache = client.event_cache();
1228
1229 event_cache.subscribe().unwrap();
1231 event_cache.enable_storage().unwrap();
1232
1233 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1234 let room = client.get_room(room_id).unwrap();
1235
1236 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1237
1238 let mut relations = BundledMessageLikeRelations::new();
1240 relations.replace =
1241 Some(Box::new(f.text_msg("Hello, Kind Sir").sender(*ALICE).into_raw_sync()));
1242 let ev = f.text_msg("hey yo").sender(*ALICE).bundled_relations(relations).into_event();
1243
1244 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1245
1246 room_event_cache
1247 .inner
1248 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1249 .await
1250 .unwrap();
1251
1252 {
1254 let (events, _) = room_event_cache.subscribe().await;
1255
1256 assert_eq!(events.len(), 1);
1257
1258 let ev = events[0].raw().deserialize().unwrap();
1259 assert_let!(
1260 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1261 );
1262
1263 let original = msg.as_original().unwrap();
1264 assert_eq!(original.content.body(), "hey yo");
1265 assert!(original.unsigned.relations.replace.is_some());
1266 }
1267
1268 let linked_chunk =
1270 from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1271 .unwrap()
1272 .unwrap();
1273
1274 assert_eq!(linked_chunk.chunks().count(), 1);
1275
1276 let mut chunks = linked_chunk.chunks();
1277 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1278 assert_eq!(events.len(), 1);
1279
1280 let ev = events[0].raw().deserialize().unwrap();
1281 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1282
1283 let original = msg.as_original().unwrap();
1284 assert_eq!(original.content.body(), "hey yo");
1285 assert!(original.unsigned.relations.replace.is_none());
1286 });
1287
1288 assert!(chunks.next().is_none());
1290 }
1291
1292 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1294 async fn test_clear() {
1295 use eyeball_im::VectorDiff;
1296 use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1297
1298 use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
1299
1300 let room_id = room_id!("!galette:saucisse.bzh");
1301 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1302
1303 let event_cache_store = Arc::new(MemoryStore::new());
1304
1305 let event_id1 = event_id!("$1");
1306 let event_id2 = event_id!("$2");
1307
1308 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1309 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1310
1311 event_cache_store
1313 .handle_linked_chunk_updates(
1314 room_id,
1315 vec![
1316 Update::NewItemsChunk {
1318 previous: None,
1319 new: ChunkIdentifier::new(0),
1320 next: None,
1321 },
1322 Update::NewGapChunk {
1324 previous: Some(ChunkIdentifier::new(0)),
1325 new: ChunkIdentifier::new(42),
1327 next: None,
1328 gap: Gap { prev_token: "comté".to_owned() },
1329 },
1330 Update::NewItemsChunk {
1332 previous: Some(ChunkIdentifier::new(42)),
1333 new: ChunkIdentifier::new(1),
1334 next: None,
1335 },
1336 Update::PushItems {
1337 at: Position::new(ChunkIdentifier::new(1), 0),
1338 items: vec![ev1.clone()],
1339 },
1340 Update::NewItemsChunk {
1342 previous: Some(ChunkIdentifier::new(1)),
1343 new: ChunkIdentifier::new(2),
1344 next: None,
1345 },
1346 Update::PushItems {
1347 at: Position::new(ChunkIdentifier::new(2), 0),
1348 items: vec![ev2.clone()],
1349 },
1350 ],
1351 )
1352 .await
1353 .unwrap();
1354
1355 let client = MockClientBuilder::new("http://localhost".to_owned())
1356 .store_config(
1357 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1358 )
1359 .build()
1360 .await;
1361
1362 let event_cache = client.event_cache();
1363
1364 event_cache.subscribe().unwrap();
1366 event_cache.enable_storage().unwrap();
1367
1368 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1369 let room = client.get_room(room_id).unwrap();
1370
1371 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1372
1373 let (items, mut stream) = room_event_cache.subscribe().await;
1374
1375 {
1377 assert!(room_event_cache.event(event_id1).await.is_none());
1379 assert!(room_event_cache.event(event_id2).await.is_some());
1381
1382 assert_eq!(items.len(), 1);
1384 assert_eq!(items[0].event_id().unwrap(), event_id2);
1385
1386 assert!(stream.is_empty());
1387 }
1388
1389 {
1391 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1392
1393 assert_let_timeout!(
1394 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1395 );
1396 assert_eq!(diffs.len(), 1);
1397 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: _ });
1398
1399 assert!(room_event_cache.event(event_id1).await.is_some());
1401 assert!(room_event_cache.event(event_id2).await.is_some());
1402
1403 assert!(stream.is_empty());
1404 }
1405
1406 room_event_cache.clear().await.unwrap();
1408
1409 assert_let_timeout!(
1411 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1412 );
1413 assert_eq!(diffs.len(), 1);
1414 assert_let!(VectorDiff::Clear = &diffs[0]);
1415
1416 assert!(room_event_cache.event(event_id1).await.is_none());
1418
1419 let (items, _) = room_event_cache.subscribe().await;
1420 assert!(items.is_empty());
1421
1422 let linked_chunk =
1424 from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1425 .unwrap()
1426 .unwrap();
1427
1428 assert_eq!(linked_chunk.num_items(), 0);
1432 }
1433
1434 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1436 async fn test_load_from_storage() {
1437 use eyeball_im::VectorDiff;
1438
1439 use super::RoomEventCacheUpdate;
1440 use crate::assert_let_timeout;
1441
1442 let room_id = room_id!("!galette:saucisse.bzh");
1443 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1444
1445 let event_cache_store = Arc::new(MemoryStore::new());
1446
1447 let event_id1 = event_id!("$1");
1448 let event_id2 = event_id!("$2");
1449
1450 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1451 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1452
1453 event_cache_store
1455 .handle_linked_chunk_updates(
1456 room_id,
1457 vec![
1458 Update::NewItemsChunk {
1460 previous: None,
1461 new: ChunkIdentifier::new(0),
1462 next: None,
1463 },
1464 Update::NewGapChunk {
1466 previous: Some(ChunkIdentifier::new(0)),
1467 new: ChunkIdentifier::new(42),
1469 next: None,
1470 gap: Gap { prev_token: "cheddar".to_owned() },
1471 },
1472 Update::NewItemsChunk {
1474 previous: Some(ChunkIdentifier::new(42)),
1475 new: ChunkIdentifier::new(1),
1476 next: None,
1477 },
1478 Update::PushItems {
1479 at: Position::new(ChunkIdentifier::new(1), 0),
1480 items: vec![ev1.clone()],
1481 },
1482 Update::NewItemsChunk {
1484 previous: Some(ChunkIdentifier::new(1)),
1485 new: ChunkIdentifier::new(2),
1486 next: None,
1487 },
1488 Update::PushItems {
1489 at: Position::new(ChunkIdentifier::new(2), 0),
1490 items: vec![ev2.clone()],
1491 },
1492 ],
1493 )
1494 .await
1495 .unwrap();
1496
1497 let client = MockClientBuilder::new("http://localhost".to_owned())
1498 .store_config(
1499 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1500 )
1501 .build()
1502 .await;
1503
1504 let event_cache = client.event_cache();
1505
1506 event_cache.subscribe().unwrap();
1508 event_cache.enable_storage().unwrap();
1509
1510 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1511 let room = client.get_room(room_id).unwrap();
1512
1513 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1514
1515 let (items, mut stream) = room_event_cache.subscribe().await;
1516
1517 assert_eq!(items.len(), 1);
1520 assert_eq!(items[0].event_id().unwrap(), event_id2);
1521 assert!(stream.is_empty());
1522
1523 assert!(room_event_cache.event(event_id1).await.is_none());
1525 assert!(room_event_cache.event(event_id2).await.is_some());
1526
1527 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1529
1530 assert_let_timeout!(
1531 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1532 );
1533 assert_eq!(diffs.len(), 1);
1534 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: _ });
1535
1536 assert!(room_event_cache.event(event_id1).await.is_some());
1538 assert!(room_event_cache.event(event_id2).await.is_some());
1539
1540 assert!(stream.is_empty());
1541
1542 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1544 room_event_cache
1545 .inner
1546 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1547 .await
1548 .unwrap();
1549
1550 let (items, _stream) = room_event_cache.subscribe().await;
1555 assert_eq!(items.len(), 2);
1556 assert_eq!(items[0].event_id().unwrap(), event_id1);
1557 assert_eq!(items[1].event_id().unwrap(), event_id2);
1558 }
1559
1560 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1562 async fn test_load_from_storage_resilient_to_failure() {
1563 let room_id = room_id!("!fondue:patate.ch");
1564 let event_cache_store = Arc::new(MemoryStore::new());
1565
1566 let event = EventFactory::new()
1567 .room(room_id)
1568 .sender(user_id!("@ben:saucisse.bzh"))
1569 .text_msg("foo")
1570 .event_id(event_id!("$42"))
1571 .into_event();
1572
1573 event_cache_store
1575 .handle_linked_chunk_updates(
1576 room_id,
1577 vec![
1578 Update::NewItemsChunk {
1579 previous: None,
1580 new: ChunkIdentifier::new(0),
1581 next: None,
1582 },
1583 Update::PushItems {
1584 at: Position::new(ChunkIdentifier::new(0), 0),
1585 items: vec![event],
1586 },
1587 Update::NewItemsChunk {
1588 previous: Some(ChunkIdentifier::new(0)),
1589 new: ChunkIdentifier::new(1),
1590 next: Some(ChunkIdentifier::new(0)),
1591 },
1592 ],
1593 )
1594 .await
1595 .unwrap();
1596
1597 let client = MockClientBuilder::new("http://localhost".to_owned())
1598 .store_config(
1599 StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
1600 )
1601 .build()
1602 .await;
1603
1604 let event_cache = client.event_cache();
1605
1606 event_cache.subscribe().unwrap();
1608 event_cache.enable_storage().unwrap();
1609
1610 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1611 let room = client.get_room(room_id).unwrap();
1612
1613 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1614
1615 let (items, _stream) = room_event_cache.subscribe().await;
1616
1617 assert!(items.is_empty());
1620
1621 let raw_chunks = event_cache_store.load_all_chunks(room_id).await.unwrap();
1624 assert!(raw_chunks.is_empty());
1625 }
1626
1627 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1629 async fn test_no_useless_gaps() {
1630 let room_id = room_id!("!galette:saucisse.bzh");
1631
1632 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
1633
1634 let event_cache = client.event_cache();
1635 event_cache.subscribe().unwrap();
1636
1637 let has_storage = true; event_cache.enable_storage().unwrap();
1639
1640 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1641 let room = client.get_room(room_id).unwrap();
1642 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1643
1644 let f = EventFactory::new().room(room_id).sender(*ALICE);
1645
1646 room_event_cache
1649 .inner
1650 .handle_joined_room_update(
1651 has_storage,
1652 JoinedRoomUpdate {
1653 timeline: Timeline {
1654 limited: true,
1655 prev_batch: Some("raclette".to_owned()),
1656 events: vec![f.text_msg("hey yo").into_event()],
1657 },
1658 ..Default::default()
1659 },
1660 )
1661 .await
1662 .unwrap();
1663
1664 {
1665 let state = room_event_cache.inner.state.read().await;
1666
1667 let mut num_gaps = 0;
1668 let mut num_events = 0;
1669
1670 for c in state.events().chunks() {
1671 match c.content() {
1672 ChunkContent::Items(items) => num_events += items.len(),
1673 ChunkContent::Gap(_) => num_gaps += 1,
1674 }
1675 }
1676
1677 assert_eq!(num_gaps, 1);
1679 assert_eq!(num_events, 1);
1680 }
1681
1682 room_event_cache
1685 .inner
1686 .handle_joined_room_update(
1687 has_storage,
1688 JoinedRoomUpdate {
1689 timeline: Timeline {
1690 limited: false,
1691 prev_batch: Some("fondue".to_owned()),
1692 events: vec![f.text_msg("sup").into_event()],
1693 },
1694 ..Default::default()
1695 },
1696 )
1697 .await
1698 .unwrap();
1699
1700 {
1701 let state = room_event_cache.inner.state.read().await;
1702
1703 let mut num_gaps = 0;
1704 let mut num_events = 0;
1705
1706 for c in state.events().chunks() {
1707 match c.content() {
1708 ChunkContent::Items(items) => num_events += items.len(),
1709 ChunkContent::Gap(gap) => {
1710 assert_eq!(gap.prev_token, "raclette");
1711 num_gaps += 1;
1712 }
1713 }
1714 }
1715
1716 assert_eq!(num_gaps, 1);
1718 assert_eq!(num_events, 2);
1719 }
1720 }
1721
1722 async fn assert_relations(
1723 room_id: &RoomId,
1724 original_event: TimelineEvent,
1725 related_event: TimelineEvent,
1726 event_factory: EventFactory,
1727 ) {
1728 let client = logged_in_client(None).await;
1729
1730 let event_cache = client.event_cache();
1731 event_cache.subscribe().unwrap();
1732
1733 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1734 let room = client.get_room(room_id).unwrap();
1735
1736 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1737
1738 let original_event_id = original_event.event_id().unwrap();
1740 room_event_cache.save_event(original_event).await;
1741
1742 let unrelated_id = event_id!("$2");
1744 room_event_cache
1745 .save_event(event_factory.text_msg("An unrelated event").event_id(unrelated_id).into())
1746 .await;
1747
1748 let related_id = related_event.event_id().unwrap();
1750 room_event_cache.save_event(related_event).await;
1751
1752 let (event, related_events) =
1753 room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
1754 let cached_event_id = event.event_id().unwrap();
1756 assert_eq!(cached_event_id, original_event_id);
1757
1758 assert_eq!(related_events.len(), 1);
1760 let related_event_id = related_events[0].event_id().unwrap();
1761 assert_eq!(related_event_id, related_id);
1762 }
1763}