1#![forbid(missing_docs)]
29
30use std::{
31 collections::HashMap,
32 fmt,
33 ops::{Deref, DerefMut},
34 sync::{Arc, OnceLock},
35};
36
37use futures_util::future::try_join_all;
38use matrix_sdk_base::{
39 cross_process_lock::CrossProcessLockError,
40 event_cache::store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
41 linked_chunk::lazy_loader::LazyLoaderError,
42 sync::RoomUpdates,
43 task_monitor::BackgroundTaskHandle,
44 timer,
45};
46use ruma::{OwnedRoomId, RoomId};
47use tokio::sync::{
48 Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock,
49 broadcast::{Receiver, Sender, channel},
50 mpsc,
51};
52use tracing::{error, instrument, trace};
53
54use crate::{
55 Client,
56 client::{ClientInner, WeakClient},
57 paginators::PaginatorError,
58};
59
60mod caches;
61mod deduplicator;
62mod persistence;
63#[cfg(feature = "e2e-encryption")]
64mod redecryptor;
65mod tasks;
66
67use caches::{Caches, room::RoomEventCacheLinkedChunkUpdate};
68pub use caches::{
69 TimelineVectorDiffs,
70 event_focused::EventFocusThreadMode,
71 pagination::{BackPaginationOutcome, PaginationStatus},
72 room::{
73 RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheSubscriber,
74 RoomEventCacheUpdate, pagination::RoomPagination,
75 },
76 thread::pagination::ThreadPagination,
77};
78#[cfg(feature = "e2e-encryption")]
79pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
80
81#[derive(thiserror::Error, Clone, Debug)]
83pub enum EventCacheError {
84 #[error(
87 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
88 )]
89 NotSubscribedYet,
90
91 #[error("Room `{room_id}` is not found.")]
93 RoomNotFound {
94 room_id: OwnedRoomId,
96 },
97
98 #[error(transparent)]
100 PaginationError(Arc<crate::Error>),
101
102 #[error(transparent)]
104 InitialPaginationError(#[from] PaginatorError),
105
106 #[error(transparent)]
108 Storage(#[from] EventCacheStoreError),
109
110 #[error(transparent)]
112 LockingStorage(#[from] CrossProcessLockError),
113
114 #[error("The owning client of the event cache has been dropped.")]
118 ClientDropped,
119
120 #[error(transparent)]
125 LinkedChunkLoader(#[from] LazyLoaderError),
126
127 #[error("Unable to load any of the pinned events.")]
131 UnableToLoadPinnedEvents,
132
133 #[error("the linked chunk metadata is invalid: {details}")]
136 InvalidLinkedChunkMetadata {
137 details: String,
139 },
140}
141
142pub type Result<T> = std::result::Result<T, EventCacheError>;
144
145pub struct EventCacheDropHandles {
147 listen_updates_task: BackgroundTaskHandle,
149
150 ignore_user_list_update_task: BackgroundTaskHandle,
152
153 auto_shrink_linked_chunk_task: BackgroundTaskHandle,
155
156 #[cfg(feature = "e2e-encryption")]
158 _redecryptor: redecryptor::Redecryptor,
159}
160
161impl fmt::Debug for EventCacheDropHandles {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
164 }
165}
166
167impl Drop for EventCacheDropHandles {
168 fn drop(&mut self) {
169 self.listen_updates_task.abort();
170 self.ignore_user_list_update_task.abort();
171 self.auto_shrink_linked_chunk_task.abort();
172 }
173}
174
175#[derive(Clone)]
181pub struct EventCache {
182 inner: Arc<EventCacheInner>,
184}
185
186impl fmt::Debug for EventCache {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 f.debug_struct("EventCache").finish_non_exhaustive()
189 }
190}
191
192impl EventCache {
193 pub(crate) fn new(client: &Arc<ClientInner>, event_cache_store: EventCacheStoreLock) -> Self {
195 let (generic_update_sender, _) = channel(128);
196 let (linked_chunk_update_sender, _) = channel(128);
197
198 let weak_client = WeakClient::from_inner(client);
199
200 let (thread_subscriber_sender, _thread_subscriber_receiver) = channel(128);
201 let thread_subscriber_task = client
202 .task_monitor
203 .spawn_background_task(
204 "event_cache::thread_subscriber",
205 tasks::thread_subscriber_task(
206 weak_client.clone(),
207 linked_chunk_update_sender.clone(),
208 thread_subscriber_sender,
209 ),
210 )
211 .abort_on_drop();
212
213 #[cfg(feature = "experimental-search")]
214 let search_indexing_task = client
215 .task_monitor
216 .spawn_background_task(
217 "event_cache::search_indexing",
218 tasks::search_indexing_task(
219 weak_client.clone(),
220 linked_chunk_update_sender.clone(),
221 ),
222 )
223 .abort_on_drop();
224
225 #[cfg(feature = "e2e-encryption")]
226 let redecryption_channels = redecryptor::RedecryptorChannels::new();
227
228 Self {
229 inner: Arc::new(EventCacheInner {
230 client: weak_client,
231 config: RwLock::new(EventCacheConfig::default()),
232 store: event_cache_store,
233 multiple_room_updates_lock: Default::default(),
234 by_room: Default::default(),
235 drop_handles: Default::default(),
236 auto_shrink_sender: Default::default(),
237 generic_update_sender,
238 linked_chunk_update_sender,
239 _thread_subscriber_task: thread_subscriber_task,
240 #[cfg(feature = "experimental-search")]
241 _search_indexing_task: search_indexing_task,
242 #[cfg(feature = "e2e-encryption")]
243 redecryption_channels,
244 #[cfg(feature = "testing")]
245 thread_subscriber_receiver: _thread_subscriber_receiver,
246 }),
247 }
248 }
249
250 pub async fn config(&self) -> impl Deref<Target = EventCacheConfig> + '_ {
253 self.inner.config.read().await
254 }
255
256 pub async fn config_mut(&self) -> impl DerefMut<Target = EventCacheConfig> + '_ {
258 self.inner.config.write().await
259 }
260
261 #[cfg(feature = "testing")]
265 pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
266 self.inner.thread_subscriber_receiver.resubscribe()
267 }
268
269 pub fn subscribe(&self) -> Result<()> {
275 let client = self.inner.client()?;
276
277 let _ = self.inner.drop_handles.get_or_init(|| {
279 let task_monitor = client.task_monitor();
280
281 let listen_updates_task = task_monitor.spawn_background_task("event_cache::room_updates_task", tasks::room_updates_task(
283 self.inner.clone(),
284 client.subscribe_to_all_room_updates(),
285 ));
286
287 let ignore_user_list_update_task = task_monitor.spawn_background_task("event_cache::ignore_user_list_update_task", tasks::ignore_user_list_update_task(
288 self.inner.clone(),
289 client.subscribe_to_ignore_user_list_changes(),
290 ));
291
292 let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
293
294 self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
296
297 let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", tasks::auto_shrink_linked_chunk_task(
298 Arc::downgrade(&self.inner),
299 auto_shrink_receiver,
300 ));
301
302 #[cfg(feature = "e2e-encryption")]
303 let redecryptor = {
304 let receiver = self
305 .inner
306 .redecryption_channels
307 .decryption_request_receiver
308 .lock()
309 .take()
310 .expect("We should have initialized the channel an subscribing should happen only once");
311
312 redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
313 };
314
315
316 Arc::new(EventCacheDropHandles {
317 listen_updates_task,
318 ignore_user_list_update_task,
319 auto_shrink_linked_chunk_task,
320 #[cfg(feature = "e2e-encryption")]
321 _redecryptor: redecryptor,
322 })
323 });
324
325 Ok(())
326 }
327
328 #[doc(hidden)]
330 pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
331 self.inner.handle_room_updates(updates).await
332 }
333
334 pub fn has_subscribed(&self) -> bool {
336 self.inner.drop_handles.get().is_some()
337 }
338
339 pub(crate) async fn for_room(
341 &self,
342 room_id: &RoomId,
343 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
344 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
345 return Err(EventCacheError::NotSubscribedYet);
346 };
347
348 let room = self.inner.all_caches_for_room(room_id).await?.room.clone();
349
350 Ok((room, drop_handles))
351 }
352
353 pub async fn clear_all_rooms(&self) -> Result<()> {
357 self.inner.clear_all_rooms().await
358 }
359
360 pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
370 self.inner.generic_update_sender.subscribe()
371 }
372}
373
374#[derive(Clone, Copy, Debug)]
376pub struct EventCacheConfig {
377 pub max_pinned_events_concurrent_requests: usize,
379
380 pub max_pinned_events_to_load: usize,
382}
383
384impl EventCacheConfig {
385 const DEFAULT_MAX_EVENTS_TO_LOAD: usize = 128;
387
388 const DEFAULT_MAX_CONCURRENT_REQUESTS: usize = 8;
391}
392
393impl Default for EventCacheConfig {
394 fn default() -> Self {
395 Self {
396 max_pinned_events_concurrent_requests: Self::DEFAULT_MAX_CONCURRENT_REQUESTS,
397 max_pinned_events_to_load: Self::DEFAULT_MAX_EVENTS_TO_LOAD,
398 }
399 }
400}
401
402type CachesByRoom = HashMap<OwnedRoomId, Caches>;
403
404struct EventCacheInner {
405 client: WeakClient,
408
409 config: RwLock<EventCacheConfig>,
411
412 store: EventCacheStoreLock,
414
415 multiple_room_updates_lock: Mutex<()>,
422
423 by_room: Arc<RwLock<CachesByRoom>>,
427
428 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
430
431 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
438
439 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
444
445 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
453
454 _thread_subscriber_task: BackgroundTaskHandle,
461
462 #[cfg(feature = "experimental-search")]
469 _search_indexing_task: BackgroundTaskHandle,
470
471 #[cfg(feature = "testing")]
477 thread_subscriber_receiver: Receiver<()>,
478
479 #[cfg(feature = "e2e-encryption")]
480 redecryption_channels: redecryptor::RedecryptorChannels,
481}
482
483type AutoShrinkChannelPayload = OwnedRoomId;
484
485impl EventCacheInner {
486 fn client(&self) -> Result<Client> {
487 self.client.get().ok_or(EventCacheError::ClientDropped)
488 }
489
490 async fn clear_all_rooms(&self) -> Result<()> {
492 let mut all_caches = self.by_room.write().await;
533
534 let resets =
537 try_join_all(all_caches.values_mut().map(|caches| caches.prepare_to_reset())).await?;
538
539 let store_guard = match self.store.lock().await? {
541 EventCacheStoreLockState::Clean(store_guard) => store_guard,
542 EventCacheStoreLockState::Dirty(store_guard) => store_guard,
543 };
544 store_guard.clear_all_linked_chunks().await?;
545
546 try_join_all(resets.into_iter().map(|reset_cache| reset_cache.reset_all())).await?;
549
550 Ok(())
551 }
552
553 #[instrument(skip(self, updates))]
555 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
556 let _lock = {
559 let _timer = timer!("Taking the `multiple_room_updates_lock`");
560 self.multiple_room_updates_lock.lock().await
561 };
562
563 for (room_id, left_room_update) in updates.left {
570 let Ok(caches) = self.all_caches_for_room(&room_id).await else {
571 error!(?room_id, "Room must exist");
572 continue;
573 };
574
575 if let Err(err) = caches.handle_left_room_update(left_room_update).await {
576 error!("handling left room update: {err}");
578 }
579 }
580
581 for (room_id, joined_room_update) in updates.joined {
583 trace!(?room_id, "Handling a `JoinedRoomUpdate`");
584
585 let Ok(caches) = self.all_caches_for_room(&room_id).await else {
586 error!(?room_id, "Room must exist");
587 continue;
588 };
589
590 if let Err(err) = caches.handle_joined_room_update(joined_room_update).await {
591 error!(%room_id, "handling joined room update: {err}");
593 }
594 }
595
596 Ok(())
600 }
601
602 async fn all_caches_for_room(
604 &self,
605 room_id: &RoomId,
606 ) -> Result<OwnedRwLockReadGuard<CachesByRoom, Caches>> {
607 match OwnedRwLockReadGuard::try_map(self.by_room.clone().read_owned().await, |by_room| {
610 by_room.get(room_id)
611 }) {
612 Ok(caches) => Ok(caches),
613
614 Err(by_room_guard) => {
615 drop(by_room_guard);
617 let by_room_guard = self.by_room.clone().write_owned().await;
618
619 let mut by_room_guard =
622 match OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
623 by_room.get(room_id)
624 }) {
625 Ok(caches) => return Ok(caches),
626 Err(by_room_guard) => by_room_guard,
627 };
628
629 let caches = Caches::new(
630 &self.client,
631 room_id,
632 self.generic_update_sender.clone(),
633 self.linked_chunk_update_sender.clone(),
634 self.auto_shrink_sender.get().cloned().expect(
637 "we must have called `EventCache::subscribe()` before calling here.",
638 ),
639 self.store.clone(),
640 )
641 .await?;
642
643 by_room_guard.insert(room_id.to_owned(), caches);
644
645 Ok(OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
646 by_room.get(room_id)
647 })
648 .expect("`Caches` has just been inserted"))
649 }
650 }
651 }
652}
653
654#[derive(Debug, Clone)]
656pub enum EventsOrigin {
657 Sync,
659
660 Pagination,
662
663 Cache,
665}
666
667#[cfg(test)]
668mod tests {
669 use std::{ops::Not, sync::Arc, time::Duration};
670
671 use assert_matches::assert_matches;
672 use futures_util::FutureExt as _;
673 use matrix_sdk_base::{
674 RoomState,
675 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
676 sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
677 };
678 use matrix_sdk_test::{
679 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
680 };
681 use ruma::{event_id, room_id, user_id};
682 use tokio::time::sleep;
683
684 use super::{EventCacheError, RoomEventCacheGenericUpdate};
685 use crate::test_utils::{
686 assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
687 };
688
689 #[async_test]
690 async fn test_must_explicitly_subscribe() {
691 let client = logged_in_client(None).await;
692
693 let event_cache = client.event_cache();
694
695 let room_id = room_id!("!omelette:fromage.fr");
698 let result = event_cache.for_room(room_id).await;
699
700 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
703 }
704
705 #[async_test]
706 async fn test_get_event_by_id() {
707 let client = logged_in_client(None).await;
708 let room_id1 = room_id!("!galette:saucisse.bzh");
709 let room_id2 = room_id!("!crepe:saucisse.bzh");
710
711 client.base_client().get_or_create_room(room_id1, RoomState::Joined);
712 client.base_client().get_or_create_room(room_id2, RoomState::Joined);
713
714 let event_cache = client.event_cache();
715 event_cache.subscribe().unwrap();
716
717 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
719
720 let eid1 = event_id!("$1");
721 let eid2 = event_id!("$2");
722 let eid3 = event_id!("$3");
723
724 let joined_room_update1 = JoinedRoomUpdate {
725 timeline: Timeline {
726 events: vec![
727 f.text_msg("hey").event_id(eid1).into(),
728 f.text_msg("you").event_id(eid2).into(),
729 ],
730 ..Default::default()
731 },
732 ..Default::default()
733 };
734
735 let joined_room_update2 = JoinedRoomUpdate {
736 timeline: Timeline {
737 events: vec![f.text_msg("bjr").event_id(eid3).into()],
738 ..Default::default()
739 },
740 ..Default::default()
741 };
742
743 let mut updates = RoomUpdates::default();
744 updates.joined.insert(room_id1.to_owned(), joined_room_update1);
745 updates.joined.insert(room_id2.to_owned(), joined_room_update2);
746
747 event_cache.inner.handle_room_updates(updates).await.unwrap();
749
750 let room1 = client.get_room(room_id1).unwrap();
752
753 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
754
755 let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
756 assert_event_matches_msg(&found1, "hey");
757
758 let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
759 assert_event_matches_msg(&found2, "you");
760
761 assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
764 }
765
766 #[async_test]
767 async fn test_save_event() {
768 let client = logged_in_client(None).await;
769 let room_id = room_id!("!galette:saucisse.bzh");
770
771 let event_cache = client.event_cache();
772 event_cache.subscribe().unwrap();
773
774 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
775 let event_id = event_id!("$1");
776
777 client.base_client().get_or_create_room(room_id, RoomState::Joined);
778 let room = client.get_room(room_id).unwrap();
779
780 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
781 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
782
783 assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
785 }
786
787 #[async_test]
788 async fn test_generic_update_when_loading_rooms() {
789 let user = user_id!("@mnt_io:matrix.org");
791 let client = logged_in_client(None).await;
792 let room_id_0 = room_id!("!raclette:patate.ch");
793 let room_id_1 = room_id!("!fondue:patate.ch");
794
795 let event_factory = EventFactory::new().room(room_id_0).sender(user);
796
797 let event_cache = client.event_cache();
798 event_cache.subscribe().unwrap();
799
800 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
801 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
802
803 client
804 .event_cache_store()
805 .lock()
806 .await
807 .expect("Could not acquire the event cache lock")
808 .as_clean()
809 .expect("Could not acquire a clean event cache lock")
810 .handle_linked_chunk_updates(
811 LinkedChunkId::Room(room_id_0),
812 vec![
813 Update::NewItemsChunk {
815 previous: None,
816 new: ChunkIdentifier::new(0),
817 next: None,
818 },
819 Update::PushItems {
820 at: Position::new(ChunkIdentifier::new(0), 0),
821 items: vec![
822 event_factory
823 .text_msg("hello")
824 .sender(user)
825 .event_id(event_id!("$ev0"))
826 .into_event(),
827 ],
828 },
829 ],
830 )
831 .await
832 .unwrap();
833
834 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
835
836 {
838 let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
839
840 assert_matches!(
841 generic_stream.recv().await,
842 Ok(RoomEventCacheGenericUpdate { room_id }) => {
843 assert_eq!(room_id, room_id_0);
844 }
845 );
846 }
847
848 {
850 let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
851
852 assert!(generic_stream.recv().now_or_never().is_none());
853 }
854 }
855
856 #[async_test]
857 async fn test_generic_update_when_paginating_room() {
858 let user = user_id!("@mnt_io:matrix.org");
860 let client = logged_in_client(None).await;
861 let room_id = room_id!("!raclette:patate.ch");
862
863 let event_factory = EventFactory::new().room(room_id).sender(user);
864
865 let event_cache = client.event_cache();
866 event_cache.subscribe().unwrap();
867
868 client.base_client().get_or_create_room(room_id, RoomState::Joined);
869
870 client
871 .event_cache_store()
872 .lock()
873 .await
874 .expect("Could not acquire the event cache lock")
875 .as_clean()
876 .expect("Could not acquire a clean event cache lock")
877 .handle_linked_chunk_updates(
878 LinkedChunkId::Room(room_id),
879 vec![
880 Update::NewItemsChunk {
882 previous: None,
883 new: ChunkIdentifier::new(0),
884 next: None,
885 },
886 Update::NewItemsChunk {
888 previous: Some(ChunkIdentifier::new(0)),
889 new: ChunkIdentifier::new(1),
890 next: None,
891 },
892 Update::NewItemsChunk {
894 previous: Some(ChunkIdentifier::new(1)),
895 new: ChunkIdentifier::new(2),
896 next: None,
897 },
898 Update::PushItems {
899 at: Position::new(ChunkIdentifier::new(2), 0),
900 items: vec![
901 event_factory
902 .text_msg("hello")
903 .sender(user)
904 .event_id(event_id!("$ev0"))
905 .into_event(),
906 ],
907 },
908 Update::NewItemsChunk {
910 previous: Some(ChunkIdentifier::new(2)),
911 new: ChunkIdentifier::new(3),
912 next: None,
913 },
914 Update::PushItems {
915 at: Position::new(ChunkIdentifier::new(3), 0),
916 items: vec![
917 event_factory
918 .text_msg("world")
919 .sender(user)
920 .event_id(event_id!("$ev1"))
921 .into_event(),
922 ],
923 },
924 ],
925 )
926 .await
927 .unwrap();
928
929 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
930
931 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
933
934 assert_matches!(
935 generic_stream.recv().await,
936 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
937 assert_eq!(room_id, expected_room_id);
938 }
939 );
940
941 let pagination = room_event_cache.pagination();
942
943 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
945
946 assert_eq!(pagination_outcome.events.len(), 1);
947 assert!(pagination_outcome.reached_start.not());
948 assert_matches!(
949 generic_stream.recv().await,
950 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
951 assert_eq!(room_id, expected_room_id);
952 }
953 );
954
955 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
957
958 assert!(pagination_outcome.events.is_empty());
959 assert!(pagination_outcome.reached_start.not());
960 assert!(generic_stream.recv().now_or_never().is_none());
961
962 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
964
965 assert!(pagination_outcome.reached_start);
966 assert!(generic_stream.recv().now_or_never().is_none());
967 }
968
969 #[async_test]
970 async fn test_for_room_when_room_is_not_found() {
971 let client = logged_in_client(None).await;
972 let room_id = room_id!("!raclette:patate.ch");
973
974 let event_cache = client.event_cache();
975 event_cache.subscribe().unwrap();
976
977 assert_matches!(
979 event_cache.for_room(room_id).await,
980 Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
981 assert_eq!(room_id, not_found_room_id);
982 }
983 );
984
985 client.base_client().get_or_create_room(room_id, RoomState::Joined);
987
988 assert!(event_cache.for_room(room_id).await.is_ok());
990 }
991
992 #[cfg(not(target_family = "wasm"))]
995 #[async_test]
996 async fn test_no_refcycle_event_cache_tasks() {
997 let client = MockClientBuilder::new(None).build().await;
998
999 sleep(Duration::from_secs(1)).await;
1001
1002 let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1003 assert_eq!(event_cache_weak.strong_count(), 1);
1004
1005 {
1006 let room_id = room_id!("!room:example.org");
1007
1008 let response = SyncResponseBuilder::default()
1010 .add_joined_room(JoinedRoomBuilder::new(room_id))
1011 .build_sync_response();
1012 client.inner.base_client.receive_sync_response(response).await.unwrap();
1013
1014 client.event_cache().subscribe().unwrap();
1015
1016 let (_room_event_cache, _drop_handles) =
1017 client.get_room(room_id).unwrap().event_cache().await.unwrap();
1018 }
1019
1020 drop(client);
1021
1022 sleep(Duration::from_secs(1)).await;
1024
1025 assert_eq!(
1027 event_cache_weak.strong_count(),
1028 0,
1029 "Too many strong references to the event cache {}",
1030 event_cache_weak.strong_count()
1031 );
1032 }
1033}