1#![forbid(missing_docs)]
29
30use std::{
31 collections::BTreeMap,
32 fmt::Debug,
33 sync::{Arc, OnceLock},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use matrix_sdk_base::{
39 deserialized_responses::{AmbiguityChange, TimelineEvent},
40 event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
41 linked_chunk::lazy_loader::LazyLoaderError,
42 store_locks::LockStoreError,
43 sync::RoomUpdates,
44};
45use matrix_sdk_common::executor::{spawn, JoinHandle};
46use once_cell::sync::OnceCell;
47use room::RoomEventCacheState;
48use ruma::{
49 events::{
50 relation::RelationType,
51 room::{message::Relation, redaction::SyncRoomRedactionEvent},
52 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
53 AnySyncTimelineEvent,
54 },
55 serde::Raw,
56 EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
57};
58use tokio::sync::{
59 broadcast::{error::RecvError, Receiver},
60 mpsc, Mutex, RwLock,
61};
62use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
63
64use self::paginator::PaginatorError;
65use crate::{client::WeakClient, Client};
66
67mod deduplicator;
68mod pagination;
69mod room;
70
71pub mod paginator;
72pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus};
73pub use room::{RoomEventCache, RoomEventCacheListener};
74
75#[derive(thiserror::Error, Debug)]
77pub enum EventCacheError {
78 #[error(
81 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
82 )]
83 NotSubscribedYet,
84
85 #[error(transparent)]
87 BackpaginationError(#[from] PaginatorError),
88
89 #[error("We were already back-paginating.")]
92 AlreadyBackpaginating,
93
94 #[error(transparent)]
96 Storage(#[from] EventCacheStoreError),
97
98 #[error(transparent)]
100 LockingStorage(#[from] LockStoreError),
101
102 #[error("The owning client of the event cache has been dropped.")]
106 ClientDropped,
107
108 #[error(transparent)]
113 LinkedChunkLoader(#[from] LazyLoaderError),
114}
115
116pub type Result<T> = std::result::Result<T, EventCacheError>;
118
119pub struct EventCacheDropHandles {
121 listen_updates_task: JoinHandle<()>,
123
124 ignore_user_list_update_task: JoinHandle<()>,
126
127 auto_shrink_linked_chunk_task: JoinHandle<()>,
129}
130
131impl Debug for EventCacheDropHandles {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
134 }
135}
136
137impl Drop for EventCacheDropHandles {
138 fn drop(&mut self) {
139 self.listen_updates_task.abort();
140 self.ignore_user_list_update_task.abort();
141 self.auto_shrink_linked_chunk_task.abort();
142 }
143}
144
145#[derive(Clone)]
151pub struct EventCache {
152 inner: Arc<EventCacheInner>,
154}
155
156impl Debug for EventCache {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("EventCache").finish_non_exhaustive()
159 }
160}
161
162impl EventCache {
163 pub(crate) fn new(client: WeakClient) -> Self {
165 Self {
166 inner: Arc::new(EventCacheInner {
167 client,
168 store: Default::default(),
169 multiple_room_updates_lock: Default::default(),
170 by_room: Default::default(),
171 drop_handles: Default::default(),
172 all_events: Default::default(),
173 auto_shrink_sender: Default::default(),
174 }),
175 }
176 }
177
178 pub fn enable_storage(&self) -> Result<()> {
183 let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
184 let client = self.inner.client()?;
185 Ok(client.event_cache_store().clone())
186 })?;
187 Ok(())
188 }
189
190 pub fn has_storage(&self) -> bool {
192 self.inner.has_storage()
193 }
194
195 pub fn subscribe(&self) -> Result<()> {
201 let client = self.inner.client()?;
202
203 let _ = self.inner.drop_handles.get_or_init(|| {
204 let listen_updates_task = spawn(Self::listen_task(
206 self.inner.clone(),
207 client.subscribe_to_all_room_updates(),
208 ));
209
210 let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
211 self.inner.clone(),
212 client.subscribe_to_ignore_user_list_changes(),
213 ));
214
215 let (tx, rx) = mpsc::channel(32);
216
217 self.inner.auto_shrink_sender.get_or_init(|| tx);
219
220 let auto_shrink_linked_chunk_tasks =
221 spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx));
222
223 Arc::new(EventCacheDropHandles {
224 listen_updates_task,
225 ignore_user_list_update_task,
226 auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks,
227 })
228 });
229
230 Ok(())
231 }
232
233 pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
237 self.inner
238 .all_events
239 .read()
240 .await
241 .events
242 .get(event_id)
243 .map(|(_room_id, event)| event.clone())
244 }
245
246 #[cfg(any(test, feature = "testing"))]
256 pub async fn empty_immutable_cache(&self) {
257 self.inner.all_events.write().await.events.clear();
258 }
259
260 #[instrument(skip_all)]
261 async fn ignore_user_list_update_task(
262 inner: Arc<EventCacheInner>,
263 mut ignore_user_list_stream: Subscriber<Vec<String>>,
264 ) {
265 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
266 span.follows_from(Span::current());
267
268 async move {
269 while ignore_user_list_stream.next().await.is_some() {
270 info!("Received an ignore user list change");
271 if let Err(err) = inner.clear_all_rooms().await {
272 error!("when clearing room storage after ignore user list change: {err}");
273 }
274 }
275 info!("Ignore user list stream has closed");
276 }
277 .instrument(span)
278 .await;
279 }
280
281 #[instrument(skip_all)]
282 async fn listen_task(
283 inner: Arc<EventCacheInner>,
284 mut room_updates_feed: Receiver<RoomUpdates>,
285 ) {
286 trace!("Spawning the listen task");
287 loop {
288 match room_updates_feed.recv().await {
289 Ok(updates) => {
290 if let Err(err) = inner.handle_room_updates(updates).await {
291 match err {
292 EventCacheError::ClientDropped => {
293 info!("Closing the event cache global listen task because client dropped");
295 break;
296 }
297 err => {
298 error!("Error when handling room updates: {err}");
299 }
300 }
301 }
302 }
303
304 Err(RecvError::Lagged(num_skipped)) => {
305 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
309 if let Err(err) = inner.clear_all_rooms().await {
310 error!("when clearing storage after lag in listen_task: {err}");
311 }
312 }
313
314 Err(RecvError::Closed) => {
315 info!("Closing the event cache global listen task because receiver closed");
317 break;
318 }
319 }
320 }
321 }
322
323 #[instrument(skip_all)]
337 async fn auto_shrink_linked_chunk_task(
338 inner: Arc<EventCacheInner>,
339 mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
340 ) {
341 while let Some(room_id) = rx.recv().await {
342 trace!(for_room = %room_id, "received notification to shrink");
343
344 let room = match inner.for_room(&room_id).await {
345 Ok(room) => room,
346 Err(err) => {
347 warn!(for_room = %room_id, "error when getting a RoomEventCache: {err}");
348 continue;
349 }
350 };
351
352 trace!("waiting for state lock…");
353 let mut state = room.inner.state.write().await;
354
355 match state.auto_shrink_if_no_listeners().await {
356 Ok(diffs) => {
357 if let Some(diffs) = diffs {
358 if !diffs.is_empty() {
365 let _ = room.inner.sender.send(
366 RoomEventCacheUpdate::UpdateTimelineEvents {
367 diffs,
368 origin: EventsOrigin::Cache,
369 },
370 );
371 }
372 } else {
373 debug!("auto-shrinking didn't happen");
374 }
375 }
376
377 Err(err) => {
378 warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
380 }
381 }
382 }
383 }
384
385 pub(crate) async fn for_room(
387 &self,
388 room_id: &RoomId,
389 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
390 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
391 return Err(EventCacheError::NotSubscribedYet);
392 };
393
394 let room = self.inner.for_room(room_id).await?;
395
396 Ok((room, drop_handles))
397 }
398
399 #[instrument(skip(self, events))]
404 pub async fn add_initial_events(
405 &self,
406 room_id: &RoomId,
407 events: Vec<TimelineEvent>,
408 prev_batch: Option<String>,
409 ) -> Result<()> {
410 if self.inner.has_storage() {
412 return Ok(());
413 }
414
415 let room_cache = self.inner.for_room(room_id).await?;
416
417 if !room_cache.inner.state.read().await.events().is_empty() {
420 return Ok(());
421 }
422
423 room_cache
428 .inner
429 .replace_all_events_by(
430 events,
431 prev_batch,
432 Default::default(),
433 Default::default(),
434 EventsOrigin::Cache,
435 )
436 .await?;
437
438 Ok(())
439 }
440}
441
442type AllEventsMap = BTreeMap<OwnedEventId, (OwnedRoomId, TimelineEvent)>;
443type RelationsMap = BTreeMap<OwnedEventId, BTreeMap<OwnedEventId, RelationType>>;
444
445#[derive(Default, Clone)]
448struct AllEventsCache {
449 events: AllEventsMap,
451 relations: RelationsMap,
454}
455
456impl AllEventsCache {
457 fn clear(&mut self) {
458 self.events.clear();
459 self.relations.clear();
460 }
461
462 fn append_related_event(&mut self, event: &TimelineEvent) {
465 let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() else {
467 return;
468 };
469
470 if let AnySyncMessageLikeEvent::RoomRedaction(room_redaction) = &ev {
472 let redacted_event_id = match room_redaction {
473 SyncRoomRedactionEvent::Original(ev) => {
474 ev.content.redacts.as_ref().or(ev.redacts.as_ref())
475 }
476 SyncRoomRedactionEvent::Redacted(redacted_redaction) => {
477 redacted_redaction.content.redacts.as_ref()
478 }
479 };
480
481 if let Some(redacted_event_id) = redacted_event_id {
482 self.relations
483 .entry(redacted_event_id.to_owned())
484 .or_default()
485 .insert(ev.event_id().to_owned(), RelationType::Replacement);
486 }
487
488 return;
489 }
490
491 let relationship = match ev.original_content() {
492 Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
493 if let Some(relation) = c.relates_to {
494 match relation {
495 Relation::Replacement(replacement) => {
496 Some((replacement.event_id, RelationType::Replacement))
497 }
498 Relation::Reply { in_reply_to } => {
499 Some((in_reply_to.event_id, RelationType::Reference))
500 }
501 Relation::Thread(thread) => Some((thread.event_id, RelationType::Thread)),
502 _ => None,
504 }
505 } else {
506 None
507 }
508 }
509 Some(AnyMessageLikeEventContent::PollResponse(c)) => {
510 Some((c.relates_to.event_id, RelationType::Reference))
511 }
512 Some(AnyMessageLikeEventContent::PollEnd(c)) => {
513 Some((c.relates_to.event_id, RelationType::Reference))
514 }
515 Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
516 Some((c.relates_to.event_id, RelationType::Reference))
517 }
518 Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
519 Some((c.relates_to.event_id, RelationType::Reference))
520 }
521 Some(AnyMessageLikeEventContent::Reaction(c)) => {
522 Some((c.relates_to.event_id, RelationType::Annotation))
523 }
524 _ => None,
525 };
526
527 if let Some(relationship) = relationship {
528 self.relations
529 .entry(relationship.0)
530 .or_default()
531 .insert(ev.event_id().to_owned(), relationship.1);
532 }
533 }
534
535 fn collect_related_events(
539 &self,
540 event_id: &EventId,
541 filter: Option<&[RelationType]>,
542 ) -> Vec<TimelineEvent> {
543 let mut results = Vec::new();
544 self.collect_related_events_rec(event_id, filter, &mut results);
545 results
546 }
547
548 fn collect_related_events_rec(
549 &self,
550 event_id: &EventId,
551 filter: Option<&[RelationType]>,
552 results: &mut Vec<TimelineEvent>,
553 ) {
554 let Some(related_event_ids) = self.relations.get(event_id) else {
555 return;
556 };
557
558 for (related_event_id, relation_type) in related_event_ids {
559 if let Some(filter) = filter {
560 if !filter.contains(relation_type) {
561 continue;
562 }
563 }
564
565 if results.iter().any(|event| {
567 event.event_id().is_some_and(|added_related_event_id| {
568 added_related_event_id == *related_event_id
569 })
570 }) {
571 continue;
572 }
573
574 if let Some((_, ev)) = self.events.get(related_event_id) {
575 results.push(ev.clone());
576 self.collect_related_events_rec(related_event_id, filter, results);
577 }
578 }
579 }
580}
581
582struct EventCacheInner {
583 client: WeakClient,
586
587 store: Arc<OnceCell<EventCacheStoreLock>>,
592
593 multiple_room_updates_lock: Mutex<()>,
600
601 by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
603
604 all_events: Arc<RwLock<AllEventsCache>>,
613
614 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
616
617 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
624}
625
626type AutoShrinkChannelPayload = OwnedRoomId;
627
628impl EventCacheInner {
629 fn client(&self) -> Result<Client> {
630 self.client.get().ok_or(EventCacheError::ClientDropped)
631 }
632
633 fn has_storage(&self) -> bool {
635 self.store.get().is_some()
636 }
637
638 async fn clear_all_rooms(&self) -> Result<()> {
640 let rooms = self.by_room.write().await;
648 for room in rooms.values() {
649 room.clear().await?;
650 }
651
652 Ok(())
653 }
654
655 #[instrument(skip(self, updates))]
657 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
658 let _lock = self.multiple_room_updates_lock.lock().await;
661
662 for (room_id, left_room_update) in updates.leave {
664 let room = self.for_room(&room_id).await?;
665
666 if let Err(err) =
667 room.inner.handle_left_room_update(self.has_storage(), left_room_update).await
668 {
669 error!("handling left room update: {err}");
671 }
672 }
673
674 for (room_id, joined_room_update) in updates.join {
676 let room = self.for_room(&room_id).await?;
677
678 if let Err(err) =
679 room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
680 {
681 error!(%room_id, "handling joined room update: {err}");
683 }
684 }
685
686 Ok(())
690 }
691
692 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
697 let by_room_guard = self.by_room.read().await;
700
701 match by_room_guard.get(room_id) {
702 Some(room) => Ok(room.clone()),
703
704 None => {
705 drop(by_room_guard);
707 let mut by_room_guard = self.by_room.write().await;
708
709 if let Some(room) = by_room_guard.get(room_id) {
712 return Ok(room.clone());
713 }
714
715 let pagination_status =
716 SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
717
718 let room_version = self
719 .client
720 .get()
721 .and_then(|client| client.get_room(room_id))
722 .as_ref()
723 .map(|room| room.clone_info().room_version_or_default())
724 .unwrap_or_else(|| {
725 warn!("unknown room version for {room_id}, using default V1");
726 RoomVersionId::V1
727 });
728
729 let room_state = RoomEventCacheState::new(
730 room_id.to_owned(),
731 room_version,
732 self.store.clone(),
733 pagination_status.clone(),
734 )
735 .await?;
736
737 let auto_shrink_sender =
740 self.auto_shrink_sender.get().cloned().expect(
741 "we must have called `EventCache::subscribe()` before calling here.",
742 );
743
744 let room_event_cache = RoomEventCache::new(
745 self.client.clone(),
746 room_state,
747 pagination_status,
748 room_id.to_owned(),
749 self.all_events.clone(),
750 auto_shrink_sender,
751 );
752
753 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
754
755 Ok(room_event_cache)
756 }
757 }
758 }
759}
760
761#[derive(Debug)]
763pub struct BackPaginationOutcome {
764 pub reached_start: bool,
766
767 pub events: Vec<TimelineEvent>,
774}
775
776#[derive(Debug, Clone)]
778pub enum RoomEventCacheUpdate {
779 MoveReadMarkerTo {
781 event_id: OwnedEventId,
783 },
784
785 UpdateMembers {
787 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
792 },
793
794 UpdateTimelineEvents {
796 diffs: Vec<VectorDiff<TimelineEvent>>,
798
799 origin: EventsOrigin,
801 },
802
803 AddEphemeralEvents {
805 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
808 },
809}
810
811#[derive(Debug, Clone)]
813pub enum EventsOrigin {
814 Sync,
816
817 Pagination,
819
820 Cache,
822}
823
824#[cfg(test)]
825mod tests {
826 use assert_matches::assert_matches;
827 use futures_util::FutureExt as _;
828 use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
829 use matrix_sdk_test::{async_test, event_factory::EventFactory};
830 use ruma::{event_id, room_id, serde::Raw, user_id};
831 use serde_json::json;
832
833 use super::{EventCacheError, RoomEventCacheUpdate};
834 use crate::test_utils::{assert_event_matches_msg, logged_in_client};
835
836 #[async_test]
837 async fn test_must_explicitly_subscribe() {
838 let client = logged_in_client(None).await;
839
840 let event_cache = client.event_cache();
841
842 let room_id = room_id!("!omelette:fromage.fr");
845 let result = event_cache.for_room(room_id).await;
846
847 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
850 }
851
852 #[async_test]
853 async fn test_uniq_read_marker() {
854 let client = logged_in_client(None).await;
855 let room_id = room_id!("!galette:saucisse.bzh");
856 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
857
858 let event_cache = client.event_cache();
859
860 event_cache.subscribe().unwrap();
861
862 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
863
864 let (events, mut stream) = room_event_cache.subscribe().await;
865
866 assert!(events.is_empty());
867
868 let read_marker_event = Raw::from_json_string(
870 json!({
871 "content": {
872 "event_id": "$crepe:saucisse.bzh"
873 },
874 "room_id": "!galette:saucisse.bzh",
875 "type": "m.fully_read"
876 })
877 .to_string(),
878 )
879 .unwrap();
880 let account_data = vec![read_marker_event; 100];
881
882 room_event_cache
883 .inner
884 .handle_joined_room_update(
885 event_cache.inner.has_storage(),
886 JoinedRoomUpdate { account_data, ..Default::default() },
887 )
888 .await
889 .unwrap();
890
891 assert_matches!(
893 stream.recv().await.unwrap(),
894 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
895 );
896
897 assert!(stream.recv().now_or_never().is_none());
898 }
899
900 #[async_test]
901 async fn test_get_event_by_id() {
902 let client = logged_in_client(None).await;
903 let room_id1 = room_id!("!galette:saucisse.bzh");
904 let room_id2 = room_id!("!crepe:saucisse.bzh");
905
906 let event_cache = client.event_cache();
907 event_cache.subscribe().unwrap();
908
909 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
911
912 let eid1 = event_id!("$1");
913 let eid2 = event_id!("$2");
914 let eid3 = event_id!("$3");
915
916 let joined_room_update1 = JoinedRoomUpdate {
917 timeline: Timeline {
918 events: vec![
919 f.text_msg("hey").event_id(eid1).into(),
920 f.text_msg("you").event_id(eid2).into(),
921 ],
922 ..Default::default()
923 },
924 ..Default::default()
925 };
926
927 let joined_room_update2 = JoinedRoomUpdate {
928 timeline: Timeline {
929 events: vec![f.text_msg("bjr").event_id(eid3).into()],
930 ..Default::default()
931 },
932 ..Default::default()
933 };
934
935 let mut updates = RoomUpdates::default();
936 updates.join.insert(room_id1.to_owned(), joined_room_update1);
937 updates.join.insert(room_id2.to_owned(), joined_room_update2);
938
939 event_cache.inner.handle_room_updates(updates).await.unwrap();
941
942 let found1 = event_cache.event(eid1).await.unwrap();
944 assert_event_matches_msg(&found1, "hey");
945
946 let found2 = event_cache.event(eid2).await.unwrap();
947 assert_event_matches_msg(&found2, "you");
948
949 let found3 = event_cache.event(eid3).await.unwrap();
950 assert_event_matches_msg(&found3, "bjr");
951
952 assert!(event_cache.event(event_id!("$unknown")).await.is_none());
954
955 client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
957 let room1 = client.get_room(room_id1).unwrap();
958
959 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
960
961 let found1 = room_event_cache.event(eid1).await.unwrap();
962 assert_event_matches_msg(&found1, "hey");
963
964 let found2 = room_event_cache.event(eid2).await.unwrap();
965 assert_event_matches_msg(&found2, "you");
966
967 assert!(room_event_cache.event(eid3).await.is_none());
970 assert!(event_cache.event(eid3).await.is_some());
972 }
973
974 #[async_test]
975 async fn test_save_event_and_clear() {
976 let client = logged_in_client(None).await;
977 let room_id = room_id!("!galette:saucisse.bzh");
978
979 let event_cache = client.event_cache();
980 event_cache.subscribe().unwrap();
981
982 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
983 let event_id = event_id!("$1");
984
985 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
986 let room = client.get_room(room_id).unwrap();
987
988 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
989 room_event_cache.save_event(f.text_msg("hey there").event_id(event_id).into()).await;
990
991 assert!(room_event_cache.event(event_id).await.is_some());
993 assert!(event_cache.event(event_id).await.is_some());
995
996 event_cache.empty_immutable_cache().await;
997
998 assert!(room_event_cache.event(event_id).await.is_none());
1000 assert!(event_cache.event(event_id).await.is_none());
1001 }
1002
1003 #[async_test]
1004 async fn test_add_initial_events() {
1005 let client = logged_in_client(None).await;
1007 let room_id = room_id!("!galette:saucisse.bzh");
1008
1009 let event_cache = client.event_cache();
1010 event_cache.subscribe().unwrap();
1011
1012 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1013 event_cache
1014 .add_initial_events(room_id, vec![f.text_msg("hey").into()], None)
1015 .await
1016 .unwrap();
1017
1018 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1019 let room = client.get_room(room_id).unwrap();
1020
1021 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1022 let (initial_events, _) = room_event_cache.subscribe().await;
1023 assert_eq!(initial_events.len(), 1);
1025 }
1026}