1#![forbid(missing_docs)]
29
30use std::{
31 collections::HashMap,
32 fmt,
33 sync::{Arc, OnceLock, RwLock as StdRwLock, RwLockReadGuard, RwLockWriteGuard},
34};
35
36use futures_util::future::try_join_all;
37use matrix_sdk_base::{
38 cross_process_lock::CrossProcessLockError,
39 event_cache::store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
40 linked_chunk::lazy_loader::LazyLoaderError,
41 sync::RoomUpdates,
42 task_monitor::BackgroundTaskHandle,
43 timer,
44};
45use ruma::{OwnedRoomId, RoomId};
46use tokio::sync::{
47 Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock,
48 broadcast::{Receiver, Sender, channel},
49 mpsc,
50};
51use tracing::{error, instrument, trace};
52
53use crate::{
54 Client,
55 client::{ClientInner, WeakClient},
56 paginators::PaginatorError,
57};
58
59mod automatic_pagination;
60mod caches;
61mod deduplicator;
62mod persistence;
63#[cfg(feature = "e2e-encryption")]
64mod redecryptor;
65mod tasks;
66
67use caches::{Caches, room::RoomEventCacheLinkedChunkUpdate};
68pub use caches::{
69 TimelineVectorDiffs,
70 event_focused::EventFocusThreadMode,
71 pagination::{BackPaginationOutcome, PaginationStatus},
72 room::{
73 RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheSubscriber,
74 RoomEventCacheUpdate, pagination::RoomPagination,
75 },
76 thread::pagination::ThreadPagination,
77};
78#[cfg(feature = "e2e-encryption")]
79pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
80
81pub use crate::event_cache::automatic_pagination::AutomaticPagination;
82
83#[derive(thiserror::Error, Clone, Debug)]
85pub enum EventCacheError {
86 #[error(
89 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
90 )]
91 NotSubscribedYet,
92
93 #[error("Room `{room_id}` is not found.")]
95 RoomNotFound {
96 room_id: OwnedRoomId,
98 },
99
100 #[error(transparent)]
102 PaginationError(Arc<crate::Error>),
103
104 #[error(transparent)]
106 InitialPaginationError(#[from] PaginatorError),
107
108 #[error(transparent)]
110 Storage(#[from] EventCacheStoreError),
111
112 #[error(transparent)]
114 LockingStorage(#[from] CrossProcessLockError),
115
116 #[error("The owning client of the event cache has been dropped.")]
120 ClientDropped,
121
122 #[error(transparent)]
127 LinkedChunkLoader(#[from] LazyLoaderError),
128
129 #[error("Unable to load any of the pinned events.")]
133 UnableToLoadPinnedEvents,
134
135 #[error("the linked chunk metadata is invalid: {details}")]
138 InvalidLinkedChunkMetadata {
139 details: String,
141 },
142}
143
144pub type Result<T> = std::result::Result<T, EventCacheError>;
146
147pub struct EventCacheDropHandles {
149 _listen_updates_task: BackgroundTaskHandle,
151
152 _ignore_user_list_update_task: BackgroundTaskHandle,
154
155 _auto_shrink_linked_chunk_task: BackgroundTaskHandle,
157
158 _thread_subscriber_task: BackgroundTaskHandle,
165
166 #[cfg(feature = "experimental-search")]
173 _search_indexing_task: BackgroundTaskHandle,
174
175 #[cfg(feature = "e2e-encryption")]
177 _redecryptor: redecryptor::Redecryptor,
178}
179
180impl fmt::Debug for EventCacheDropHandles {
181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
183 }
184}
185
186#[derive(Clone)]
192pub struct EventCache {
193 inner: Arc<EventCacheInner>,
195}
196
197impl fmt::Debug for EventCache {
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 f.debug_struct("EventCache").finish_non_exhaustive()
200 }
201}
202
203impl EventCache {
204 pub(crate) fn new(client: &Arc<ClientInner>, event_cache_store: EventCacheStoreLock) -> Self {
206 let (generic_update_sender, _) = channel(128);
207 let (linked_chunk_update_sender, _) = channel(128);
208
209 let weak_client = WeakClient::from_inner(client);
210
211 let (thread_subscriber_sender, _thread_subscriber_receiver) = channel(128);
212
213 #[cfg(feature = "e2e-encryption")]
214 let redecryption_channels = redecryptor::RedecryptorChannels::new();
215
216 Self {
217 inner: Arc::new(EventCacheInner {
218 client: weak_client,
219 config: StdRwLock::new(EventCacheConfig::default()),
220 store: event_cache_store,
221 multiple_room_updates_lock: Default::default(),
222 by_room: Default::default(),
223 drop_handles: Default::default(),
224 auto_shrink_sender: Default::default(),
225 generic_update_sender,
226 linked_chunk_update_sender,
227 #[cfg(feature = "e2e-encryption")]
228 redecryption_channels,
229 automatic_pagination: OnceLock::new(),
230 thread_subscriber_sender,
231 }),
232 }
233 }
234
235 pub fn config(&self) -> RwLockReadGuard<'_, EventCacheConfig> {
238 self.inner.config.read().unwrap()
239 }
240
241 pub fn config_mut(&self) -> RwLockWriteGuard<'_, EventCacheConfig> {
243 self.inner.config.write().unwrap()
244 }
245
246 #[cfg(feature = "testing")]
250 pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
251 self.inner.thread_subscriber_sender.subscribe()
252 }
253
254 pub fn subscribe(&self) -> Result<()> {
260 let client = self.inner.client()?;
261
262 let _ = self.inner.drop_handles.get_or_init(|| {
264 let task_monitor = client.task_monitor();
265
266 let listen_updates_task = task_monitor.spawn_infinite_task("event_cache::room_updates_task", tasks::room_updates_task(
268 self.inner.clone(),
269 client.subscribe_to_all_room_updates(),
270 )).abort_on_drop();
271
272 let ignore_user_list_update_task = task_monitor.spawn_infinite_task("event_cache::ignore_user_list_update_task", tasks::ignore_user_list_update_task(
273 self.inner.clone(),
274 client.subscribe_to_ignore_user_list_changes(),
275 )).abort_on_drop();
276
277 let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
278
279 self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
281
282 let auto_shrink_linked_chunk_task = task_monitor.spawn_infinite_task("event_cache::auto_shrink_linked_chunk_task", tasks::auto_shrink_linked_chunk_task(
283 Arc::downgrade(&self.inner),
284 auto_shrink_receiver,
285 )).abort_on_drop();
286
287 #[cfg(feature = "e2e-encryption")]
288 let redecryptor = {
289 let receiver = self
290 .inner
291 .redecryption_channels
292 .decryption_request_receiver
293 .lock()
294 .take()
295 .expect("We should have initialized the channel an subscribing should happen only once");
296
297 redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
298 };
299
300 let thread_subscriber_task = client
301 .task_monitor()
302 .spawn_infinite_task(
303 "event_cache::thread_subscriber",
304 tasks::thread_subscriber_task(
305 self.inner.client.clone(),
306 self.inner.linked_chunk_update_sender.clone(),
307 self.inner.thread_subscriber_sender.clone(),
308 ),
309 )
310 .abort_on_drop();
311
312 #[cfg(feature = "experimental-search")]
313 let search_indexing_task = client
314 .task_monitor()
315 .spawn_infinite_task(
316 "event_cache::search_indexing",
317 tasks::search_indexing_task(
318 self.inner.client.clone(),
319 self.inner.linked_chunk_update_sender.clone(),
320 ),
321 )
322 .abort_on_drop();
323
324 if self.config().experimental_auto_backpagination {
325 trace!("spawning the automatic paginations API");
328 self.inner.automatic_pagination.get_or_init(|| AutomaticPagination::new(Arc::downgrade(&self.inner), task_monitor));
329 } else {
330 trace!("automatic paginations API is disabled");
331 }
332
333 Arc::new(EventCacheDropHandles {
334 _listen_updates_task: listen_updates_task,
335 _ignore_user_list_update_task: ignore_user_list_update_task,
336 _auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_task,
337 #[cfg(feature = "e2e-encryption")]
338 _redecryptor: redecryptor,
339 _thread_subscriber_task: thread_subscriber_task,
340 #[cfg(feature = "experimental-search")]
341 _search_indexing_task: search_indexing_task,
342 })
343 });
344
345 Ok(())
346 }
347
348 #[doc(hidden)]
350 pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
351 self.inner.handle_room_updates(updates).await
352 }
353
354 pub fn has_subscribed(&self) -> bool {
356 self.inner.drop_handles.get().is_some()
357 }
358
359 pub(crate) async fn for_room(
361 &self,
362 room_id: &RoomId,
363 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
364 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
365 return Err(EventCacheError::NotSubscribedYet);
366 };
367
368 let room = self.inner.all_caches_for_room(room_id).await?.room.clone();
369
370 Ok((room, drop_handles))
371 }
372
373 pub async fn clear_all_rooms(&self) -> Result<()> {
377 self.inner.clear_all_rooms().await
378 }
379
380 pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
390 self.inner.generic_update_sender.subscribe()
391 }
392
393 pub fn automatic_pagination(&self) -> Option<AutomaticPagination> {
397 self.inner.automatic_pagination.get().cloned()
398 }
399}
400
401#[derive(Clone, Copy, Debug)]
403pub struct EventCacheConfig {
404 pub max_pinned_events_concurrent_requests: usize,
406
407 pub max_pinned_events_to_load: usize,
409
410 pub experimental_auto_backpagination: bool,
414
415 pub room_pagination_per_room_credit: usize,
424
425 pub room_pagination_batch_size: u16,
430}
431
432impl EventCacheConfig {
433 pub const DEFAULT_MAX_EVENTS_TO_LOAD: usize = 128;
435
436 pub const DEFAULT_MAX_CONCURRENT_REQUESTS: usize = 8;
439
440 pub const DEFAULT_ROOM_PAGINATION_CREDITS: usize = 20;
444
445 pub const DEFAULT_ROOM_PAGINATION_BATCH_SIZE: u16 = 30;
449}
450
451impl Default for EventCacheConfig {
452 fn default() -> Self {
453 Self {
454 max_pinned_events_concurrent_requests: Self::DEFAULT_MAX_CONCURRENT_REQUESTS,
455 max_pinned_events_to_load: Self::DEFAULT_MAX_EVENTS_TO_LOAD,
456 room_pagination_per_room_credit: Self::DEFAULT_ROOM_PAGINATION_CREDITS,
457 room_pagination_batch_size: Self::DEFAULT_ROOM_PAGINATION_BATCH_SIZE,
458 experimental_auto_backpagination: false,
459 }
460 }
461}
462
463type CachesByRoom = HashMap<OwnedRoomId, Caches>;
464
465struct EventCacheInner {
466 client: WeakClient,
469
470 config: StdRwLock<EventCacheConfig>,
472
473 store: EventCacheStoreLock,
475
476 multiple_room_updates_lock: Mutex<()>,
483
484 by_room: Arc<RwLock<CachesByRoom>>,
488
489 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
491
492 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
502
503 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
508
509 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
517
518 thread_subscriber_sender: Sender<()>,
524
525 #[cfg(feature = "e2e-encryption")]
526 redecryption_channels: redecryptor::RedecryptorChannels,
527
528 automatic_pagination: OnceLock<AutomaticPagination>,
533}
534
535type AutoShrinkChannelPayload = OwnedRoomId;
536
537impl EventCacheInner {
538 fn client(&self) -> Result<Client> {
539 self.client.get().ok_or(EventCacheError::ClientDropped)
540 }
541
542 async fn clear_all_rooms(&self) -> Result<()> {
544 let mut all_caches = self.by_room.write().await;
585
586 let resets =
589 try_join_all(all_caches.values_mut().map(|caches| caches.prepare_to_reset())).await?;
590
591 let store_guard = match self.store.lock().await? {
593 EventCacheStoreLockState::Clean(store_guard) => store_guard,
594 EventCacheStoreLockState::Dirty(store_guard) => store_guard,
595 };
596 store_guard.clear_all_linked_chunks().await?;
597
598 try_join_all(resets.into_iter().map(|reset_cache| reset_cache.reset_all())).await?;
601
602 Ok(())
603 }
604
605 #[instrument(skip(self, updates))]
607 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
608 let _lock = {
611 let _timer = timer!("Taking the `multiple_room_updates_lock`");
612 self.multiple_room_updates_lock.lock().await
613 };
614
615 for (room_id, left_room_update) in updates.left {
622 let Ok(caches) = self.all_caches_for_room(&room_id).await else {
623 error!(?room_id, "Room must exist");
624 continue;
625 };
626
627 if let Err(err) = caches.handle_left_room_update(left_room_update).await {
628 error!("handling left room update: {err}");
630 }
631 }
632
633 for (room_id, joined_room_update) in updates.joined {
635 trace!(?room_id, "Handling a `JoinedRoomUpdate`");
636
637 let Ok(caches) = self.all_caches_for_room(&room_id).await else {
638 error!(?room_id, "Room must exist");
639 continue;
640 };
641
642 if let Err(err) = caches.handle_joined_room_update(joined_room_update).await {
643 error!(%room_id, "handling joined room update: {err}");
645 }
646 }
647
648 Ok(())
652 }
653
654 async fn all_caches_for_room(
656 &self,
657 room_id: &RoomId,
658 ) -> Result<OwnedRwLockReadGuard<CachesByRoom, Caches>> {
659 match OwnedRwLockReadGuard::try_map(self.by_room.clone().read_owned().await, |by_room| {
662 by_room.get(room_id)
663 }) {
664 Ok(caches) => Ok(caches),
665
666 Err(by_room_guard) => {
667 drop(by_room_guard);
669 let by_room_guard = self.by_room.clone().write_owned().await;
670
671 let mut by_room_guard =
674 match OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
675 by_room.get(room_id)
676 }) {
677 Ok(caches) => return Ok(caches),
678 Err(by_room_guard) => by_room_guard,
679 };
680
681 let caches = Caches::new(
682 &self.client,
683 room_id,
684 self.generic_update_sender.clone(),
685 self.linked_chunk_update_sender.clone(),
686 self.auto_shrink_sender.get().cloned().expect(
689 "we must have called `EventCache::subscribe()` before calling here.",
690 ),
691 self.store.clone(),
692 self.automatic_pagination.get().cloned(),
693 )
694 .await?;
695
696 by_room_guard.insert(room_id.to_owned(), caches);
697
698 Ok(OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
699 by_room.get(room_id)
700 })
701 .expect("`Caches` has just been inserted"))
702 }
703 }
704 }
705}
706
707#[derive(Debug, Clone)]
709pub enum EventsOrigin {
710 Sync,
712
713 Pagination,
715
716 Cache,
718}
719
720#[cfg(test)]
721mod tests {
722 use std::{ops::Not, sync::Arc, time::Duration};
723
724 use assert_matches::assert_matches;
725 use futures_util::FutureExt as _;
726 use matrix_sdk_base::{
727 RoomState,
728 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
729 sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
730 };
731 use matrix_sdk_test::{
732 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
733 };
734 use ruma::{event_id, room_id, user_id};
735 use tokio::time::sleep;
736
737 use super::{EventCacheError, RoomEventCacheGenericUpdate};
738 use crate::test_utils::{
739 assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
740 };
741
742 #[async_test]
743 async fn test_must_explicitly_subscribe() {
744 let client = logged_in_client(None).await;
745
746 let event_cache = client.event_cache();
747
748 let room_id = room_id!("!omelette:fromage.fr");
751 let result = event_cache.for_room(room_id).await;
752
753 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
756 }
757
758 #[async_test]
759 async fn test_get_event_by_id() {
760 let client = logged_in_client(None).await;
761 let room_id1 = room_id!("!galette:saucisse.bzh");
762 let room_id2 = room_id!("!crepe:saucisse.bzh");
763
764 client.base_client().get_or_create_room(room_id1, RoomState::Joined);
765 client.base_client().get_or_create_room(room_id2, RoomState::Joined);
766
767 let event_cache = client.event_cache();
768 event_cache.subscribe().unwrap();
769
770 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
772
773 let eid1 = event_id!("$1");
774 let eid2 = event_id!("$2");
775 let eid3 = event_id!("$3");
776
777 let joined_room_update1 = JoinedRoomUpdate {
778 timeline: Timeline {
779 events: vec![
780 f.text_msg("hey").event_id(eid1).into(),
781 f.text_msg("you").event_id(eid2).into(),
782 ],
783 ..Default::default()
784 },
785 ..Default::default()
786 };
787
788 let joined_room_update2 = JoinedRoomUpdate {
789 timeline: Timeline {
790 events: vec![f.text_msg("bjr").event_id(eid3).into()],
791 ..Default::default()
792 },
793 ..Default::default()
794 };
795
796 let mut updates = RoomUpdates::default();
797 updates.joined.insert(room_id1.to_owned(), joined_room_update1);
798 updates.joined.insert(room_id2.to_owned(), joined_room_update2);
799
800 event_cache.inner.handle_room_updates(updates).await.unwrap();
802
803 let room1 = client.get_room(room_id1).unwrap();
805
806 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
807
808 let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
809 assert_event_matches_msg(&found1, "hey");
810
811 let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
812 assert_event_matches_msg(&found2, "you");
813
814 assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
817 }
818
819 #[async_test]
820 async fn test_save_event() {
821 let client = logged_in_client(None).await;
822 let room_id = room_id!("!galette:saucisse.bzh");
823
824 let event_cache = client.event_cache();
825 event_cache.subscribe().unwrap();
826
827 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
828 let event_id = event_id!("$1");
829
830 client.base_client().get_or_create_room(room_id, RoomState::Joined);
831 let room = client.get_room(room_id).unwrap();
832
833 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
834 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
835
836 assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
838 }
839
840 #[async_test]
841 async fn test_generic_update_when_loading_rooms() {
842 let user = user_id!("@mnt_io:matrix.org");
844 let client = logged_in_client(None).await;
845 let room_id_0 = room_id!("!raclette:patate.ch");
846 let room_id_1 = room_id!("!fondue:patate.ch");
847
848 let event_factory = EventFactory::new().room(room_id_0).sender(user);
849
850 let event_cache = client.event_cache();
851 event_cache.subscribe().unwrap();
852
853 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
854 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
855
856 client
857 .event_cache_store()
858 .lock()
859 .await
860 .expect("Could not acquire the event cache lock")
861 .as_clean()
862 .expect("Could not acquire a clean event cache lock")
863 .handle_linked_chunk_updates(
864 LinkedChunkId::Room(room_id_0),
865 vec![
866 Update::NewItemsChunk {
868 previous: None,
869 new: ChunkIdentifier::new(0),
870 next: None,
871 },
872 Update::PushItems {
873 at: Position::new(ChunkIdentifier::new(0), 0),
874 items: vec![
875 event_factory
876 .text_msg("hello")
877 .sender(user)
878 .event_id(event_id!("$ev0"))
879 .into_event(),
880 ],
881 },
882 ],
883 )
884 .await
885 .unwrap();
886
887 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
888
889 {
891 let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
892
893 assert_matches!(
894 generic_stream.recv().await,
895 Ok(RoomEventCacheGenericUpdate { room_id }) => {
896 assert_eq!(room_id, room_id_0);
897 }
898 );
899 }
900
901 {
903 let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
904
905 assert!(generic_stream.recv().now_or_never().is_none());
906 }
907 }
908
909 #[async_test]
910 async fn test_generic_update_when_paginating_room() {
911 let user = user_id!("@mnt_io:matrix.org");
913 let client = logged_in_client(None).await;
914 let room_id = room_id!("!raclette:patate.ch");
915
916 let event_factory = EventFactory::new().room(room_id).sender(user);
917
918 let event_cache = client.event_cache();
919 event_cache.subscribe().unwrap();
920
921 client.base_client().get_or_create_room(room_id, RoomState::Joined);
922
923 client
924 .event_cache_store()
925 .lock()
926 .await
927 .expect("Could not acquire the event cache lock")
928 .as_clean()
929 .expect("Could not acquire a clean event cache lock")
930 .handle_linked_chunk_updates(
931 LinkedChunkId::Room(room_id),
932 vec![
933 Update::NewItemsChunk {
935 previous: None,
936 new: ChunkIdentifier::new(0),
937 next: None,
938 },
939 Update::NewItemsChunk {
941 previous: Some(ChunkIdentifier::new(0)),
942 new: ChunkIdentifier::new(1),
943 next: None,
944 },
945 Update::NewItemsChunk {
947 previous: Some(ChunkIdentifier::new(1)),
948 new: ChunkIdentifier::new(2),
949 next: None,
950 },
951 Update::PushItems {
952 at: Position::new(ChunkIdentifier::new(2), 0),
953 items: vec![
954 event_factory
955 .text_msg("hello")
956 .sender(user)
957 .event_id(event_id!("$ev0"))
958 .into_event(),
959 ],
960 },
961 Update::NewItemsChunk {
963 previous: Some(ChunkIdentifier::new(2)),
964 new: ChunkIdentifier::new(3),
965 next: None,
966 },
967 Update::PushItems {
968 at: Position::new(ChunkIdentifier::new(3), 0),
969 items: vec![
970 event_factory
971 .text_msg("world")
972 .sender(user)
973 .event_id(event_id!("$ev1"))
974 .into_event(),
975 ],
976 },
977 ],
978 )
979 .await
980 .unwrap();
981
982 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
983
984 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
986
987 assert_matches!(
988 generic_stream.recv().await,
989 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
990 assert_eq!(room_id, expected_room_id);
991 }
992 );
993
994 let pagination = room_event_cache.pagination();
995
996 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
998
999 assert_eq!(pagination_outcome.events.len(), 1);
1000 assert!(pagination_outcome.reached_start.not());
1001 assert_matches!(
1002 generic_stream.recv().await,
1003 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1004 assert_eq!(room_id, expected_room_id);
1005 }
1006 );
1007
1008 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1010
1011 assert!(pagination_outcome.events.is_empty());
1012 assert!(pagination_outcome.reached_start.not());
1013 assert!(generic_stream.recv().now_or_never().is_none());
1014
1015 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1017
1018 assert!(pagination_outcome.reached_start);
1019 assert!(generic_stream.recv().now_or_never().is_none());
1020 }
1021
1022 #[async_test]
1023 async fn test_for_room_when_room_is_not_found() {
1024 let client = logged_in_client(None).await;
1025 let room_id = room_id!("!raclette:patate.ch");
1026
1027 let event_cache = client.event_cache();
1028 event_cache.subscribe().unwrap();
1029
1030 assert_matches!(
1032 event_cache.for_room(room_id).await,
1033 Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1034 assert_eq!(room_id, not_found_room_id);
1035 }
1036 );
1037
1038 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1040
1041 assert!(event_cache.for_room(room_id).await.is_ok());
1043 }
1044
1045 #[cfg(not(target_family = "wasm"))]
1048 #[async_test]
1049 async fn test_no_refcycle_event_cache_tasks() {
1050 let client = MockClientBuilder::new(None).build().await;
1051
1052 sleep(Duration::from_secs(1)).await;
1054
1055 let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1056 assert_eq!(event_cache_weak.strong_count(), 1);
1057
1058 {
1059 let room_id = room_id!("!room:example.org");
1060
1061 let response = SyncResponseBuilder::default()
1063 .add_joined_room(JoinedRoomBuilder::new(room_id))
1064 .build_sync_response();
1065 client.inner.base_client.receive_sync_response(response).await.unwrap();
1066
1067 client.event_cache().subscribe().unwrap();
1068
1069 let (_room_event_cache, _drop_handles) =
1070 client.get_room(room_id).unwrap().event_cache().await.unwrap();
1071 }
1072
1073 drop(client);
1074
1075 sleep(Duration::from_secs(1)).await;
1077
1078 assert_eq!(
1080 event_cache_weak.strong_count(),
1081 0,
1082 "Too many strong references to the event cache {}",
1083 event_cache_weak.strong_count()
1084 );
1085 }
1086}