1#![forbid(missing_docs)]
29
30use std::{
31 collections::HashMap,
32 fmt,
33 ops::{Deref, DerefMut},
34 sync::{Arc, OnceLock},
35};
36
37use eyeball::SharedObservable;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40 ThreadingSupport,
41 cross_process_lock::CrossProcessLockError,
42 event_cache::store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
43 linked_chunk::lazy_loader::LazyLoaderError,
44 sync::RoomUpdates,
45 task_monitor::BackgroundTaskHandle,
46 timer,
47};
48use ruma::{OwnedRoomId, RoomId};
49use tokio::sync::{
50 Mutex, RwLock,
51 broadcast::{Receiver, Sender, channel},
52 mpsc,
53};
54use tracing::{error, instrument, trace};
55
56use crate::{
57 Client,
58 client::{ClientInner, WeakClient},
59};
60
61mod caches;
62mod deduplicator;
63mod persistence;
64#[cfg(feature = "e2e-encryption")]
65mod redecryptor;
66mod tasks;
67
68use caches::room::{RoomEventCacheLinkedChunkUpdate, RoomEventCacheStateLock};
69pub use caches::{
70 TimelineVectorDiffs,
71 pagination::{BackPaginationOutcome, PaginationStatus},
72 room::{
73 RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheSubscriber,
74 RoomEventCacheUpdate, pagination::RoomPagination,
75 },
76};
77#[cfg(feature = "e2e-encryption")]
78pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
79
80#[derive(thiserror::Error, Debug)]
82pub enum EventCacheError {
83 #[error(
86 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
87 )]
88 NotSubscribedYet,
89
90 #[error("Room `{room_id}` is not found.")]
92 RoomNotFound {
93 room_id: OwnedRoomId,
95 },
96
97 #[error(transparent)]
99 BackpaginationError(Box<crate::Error>),
100
101 #[error("We were already back-paginating.")]
104 AlreadyBackpaginating,
105
106 #[error(transparent)]
108 Storage(#[from] EventCacheStoreError),
109
110 #[error(transparent)]
112 LockingStorage(#[from] CrossProcessLockError),
113
114 #[error("The owning client of the event cache has been dropped.")]
118 ClientDropped,
119
120 #[error(transparent)]
125 LinkedChunkLoader(#[from] LazyLoaderError),
126
127 #[error("Unable to load any of the pinned events.")]
131 UnableToLoadPinnedEvents,
132
133 #[error("the linked chunk metadata is invalid: {details}")]
136 InvalidLinkedChunkMetadata {
137 details: String,
139 },
140}
141
142pub type Result<T> = std::result::Result<T, EventCacheError>;
144
145pub struct EventCacheDropHandles {
147 listen_updates_task: BackgroundTaskHandle,
149
150 ignore_user_list_update_task: BackgroundTaskHandle,
152
153 auto_shrink_linked_chunk_task: BackgroundTaskHandle,
155
156 #[cfg(feature = "e2e-encryption")]
158 _redecryptor: redecryptor::Redecryptor,
159}
160
161impl fmt::Debug for EventCacheDropHandles {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
164 }
165}
166
167impl Drop for EventCacheDropHandles {
168 fn drop(&mut self) {
169 self.listen_updates_task.abort();
170 self.ignore_user_list_update_task.abort();
171 self.auto_shrink_linked_chunk_task.abort();
172 }
173}
174
175#[derive(Clone)]
181pub struct EventCache {
182 inner: Arc<EventCacheInner>,
184}
185
186impl fmt::Debug for EventCache {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 f.debug_struct("EventCache").finish_non_exhaustive()
189 }
190}
191
192impl EventCache {
193 pub(crate) fn new(client: &Arc<ClientInner>, event_cache_store: EventCacheStoreLock) -> Self {
195 let (generic_update_sender, _) = channel(128);
196 let (linked_chunk_update_sender, _) = channel(128);
197
198 let weak_client = WeakClient::from_inner(client);
199
200 let (thread_subscriber_sender, thread_subscriber_receiver) = channel(128);
201 let thread_subscriber_task = client
202 .task_monitor
203 .spawn_background_task(
204 "event_cache::thread_subscriber",
205 tasks::thread_subscriber_task(
206 weak_client.clone(),
207 linked_chunk_update_sender.clone(),
208 thread_subscriber_sender,
209 ),
210 )
211 .abort_on_drop();
212
213 #[cfg(feature = "experimental-search")]
214 let search_indexing_task = client
215 .task_monitor
216 .spawn_background_task(
217 "event_cache::search_indexing",
218 tasks::search_indexing_task(
219 weak_client.clone(),
220 linked_chunk_update_sender.clone(),
221 ),
222 )
223 .abort_on_drop();
224
225 #[cfg(feature = "e2e-encryption")]
226 let redecryption_channels = redecryptor::RedecryptorChannels::new();
227
228 Self {
229 inner: Arc::new(EventCacheInner {
230 client: weak_client,
231 config: RwLock::new(EventCacheConfig::default()),
232 store: event_cache_store,
233 multiple_room_updates_lock: Default::default(),
234 by_room: Default::default(),
235 drop_handles: Default::default(),
236 auto_shrink_sender: Default::default(),
237 generic_update_sender,
238 linked_chunk_update_sender,
239 _thread_subscriber_task: thread_subscriber_task,
240 #[cfg(feature = "experimental-search")]
241 _search_indexing_task: search_indexing_task,
242 #[cfg(feature = "e2e-encryption")]
243 redecryption_channels,
244 thread_subscriber_receiver,
245 }),
246 }
247 }
248
249 pub async fn config(&self) -> impl Deref<Target = EventCacheConfig> + '_ {
252 self.inner.config.read().await
253 }
254
255 pub async fn config_mut(&self) -> impl DerefMut<Target = EventCacheConfig> + '_ {
257 self.inner.config.write().await
258 }
259
260 #[doc(hidden)]
264 pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
265 self.inner.thread_subscriber_receiver.resubscribe()
266 }
267
268 pub fn subscribe(&self) -> Result<()> {
274 let client = self.inner.client()?;
275
276 let _ = self.inner.drop_handles.get_or_init(|| {
278 let task_monitor = client.task_monitor();
279
280 let listen_updates_task = task_monitor.spawn_background_task("event_cache::room_updates_task", tasks::room_updates_task(
282 self.inner.clone(),
283 client.subscribe_to_all_room_updates(),
284 ));
285
286 let ignore_user_list_update_task = task_monitor.spawn_background_task("event_cache::ignore_user_list_update_task", tasks::ignore_user_list_update_task(
287 self.inner.clone(),
288 client.subscribe_to_ignore_user_list_changes(),
289 ));
290
291 let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
292
293 self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
295
296 let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", tasks::auto_shrink_linked_chunk_task(
297 Arc::downgrade(&self.inner),
298 auto_shrink_receiver,
299 ));
300
301 #[cfg(feature = "e2e-encryption")]
302 let redecryptor = {
303 let receiver = self
304 .inner
305 .redecryption_channels
306 .decryption_request_receiver
307 .lock()
308 .take()
309 .expect("We should have initialized the channel an subscribing should happen only once");
310
311 redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
312 };
313
314
315 Arc::new(EventCacheDropHandles {
316 listen_updates_task,
317 ignore_user_list_update_task,
318 auto_shrink_linked_chunk_task,
319 #[cfg(feature = "e2e-encryption")]
320 _redecryptor: redecryptor,
321 })
322 });
323
324 Ok(())
325 }
326
327 #[doc(hidden)]
329 pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
330 self.inner.handle_room_updates(updates).await
331 }
332
333 pub fn has_subscribed(&self) -> bool {
335 self.inner.drop_handles.get().is_some()
336 }
337
338 pub(crate) async fn for_room(
340 &self,
341 room_id: &RoomId,
342 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
343 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
344 return Err(EventCacheError::NotSubscribedYet);
345 };
346
347 let room = self.inner.for_room(room_id).await?;
348
349 Ok((room, drop_handles))
350 }
351
352 pub async fn clear_all_rooms(&self) -> Result<()> {
356 self.inner.clear_all_rooms().await
357 }
358
359 pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
369 self.inner.generic_update_sender.subscribe()
370 }
371}
372
373#[derive(Clone, Copy, Debug)]
375pub struct EventCacheConfig {
376 pub max_pinned_events_concurrent_requests: usize,
378
379 pub max_pinned_events_to_load: usize,
381}
382
383impl EventCacheConfig {
384 const DEFAULT_MAX_EVENTS_TO_LOAD: usize = 128;
386
387 const DEFAULT_MAX_CONCURRENT_REQUESTS: usize = 8;
390}
391
392impl Default for EventCacheConfig {
393 fn default() -> Self {
394 Self {
395 max_pinned_events_concurrent_requests: Self::DEFAULT_MAX_CONCURRENT_REQUESTS,
396 max_pinned_events_to_load: Self::DEFAULT_MAX_EVENTS_TO_LOAD,
397 }
398 }
399}
400
401struct EventCacheInner {
402 client: WeakClient,
405
406 config: RwLock<EventCacheConfig>,
408
409 store: EventCacheStoreLock,
411
412 multiple_room_updates_lock: Mutex<()>,
419
420 by_room: RwLock<HashMap<OwnedRoomId, RoomEventCache>>,
422
423 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
425
426 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
433
434 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
439
440 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
448
449 _thread_subscriber_task: BackgroundTaskHandle,
456
457 #[cfg(feature = "experimental-search")]
464 _search_indexing_task: BackgroundTaskHandle,
465
466 thread_subscriber_receiver: Receiver<()>,
472
473 #[cfg(feature = "e2e-encryption")]
474 redecryption_channels: redecryptor::RedecryptorChannels,
475}
476
477type AutoShrinkChannelPayload = OwnedRoomId;
478
479impl EventCacheInner {
480 fn client(&self) -> Result<Client> {
481 self.client.get().ok_or(EventCacheError::ClientDropped)
482 }
483
484 async fn clear_all_rooms(&self) -> Result<()> {
486 let rooms = self.by_room.write().await;
526
527 let room_locks =
530 join_all(rooms.values().map(|room| async move { (room, room.state().write().await) }))
531 .await;
532
533 let store_guard = match self.store.lock().await? {
535 EventCacheStoreLockState::Clean(store_guard) => store_guard,
536 EventCacheStoreLockState::Dirty(store_guard) => store_guard,
537 };
538 store_guard.clear_all_linked_chunks().await?;
539
540 try_join_all(room_locks.into_iter().map(|(room, state_guard)| async move {
544 let mut state_guard = state_guard?;
545 let updates_as_vector_diffs = state_guard.reset().await?;
546
547 room.update_sender().send(
548 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
549 diffs: updates_as_vector_diffs,
550 origin: EventsOrigin::Cache,
551 }),
552 Some(RoomEventCacheGenericUpdate { room_id: room.room_id().to_owned() }),
553 );
554
555 Ok::<_, EventCacheError>(())
556 }))
557 .await?;
558
559 Ok(())
560 }
561
562 #[instrument(skip(self, updates))]
564 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
565 let _lock = {
568 let _timer = timer!("Taking the `multiple_room_updates_lock`");
569 self.multiple_room_updates_lock.lock().await
570 };
571
572 for (room_id, left_room_update) in updates.left {
579 let Ok(room) = self.for_room(&room_id).await else {
580 error!(?room_id, "Room must exist");
581 continue;
582 };
583
584 if let Err(err) = room.handle_left_room_update(left_room_update).await {
585 error!("handling left room update: {err}");
587 }
588 }
589
590 for (room_id, joined_room_update) in updates.joined {
592 trace!(?room_id, "Handling a `JoinedRoomUpdate`");
593
594 let Ok(room) = self.for_room(&room_id).await else {
595 error!(?room_id, "Room must exist");
596 continue;
597 };
598
599 if let Err(err) = room.handle_joined_room_update(joined_room_update).await {
600 error!(%room_id, "handling joined room update: {err}");
602 }
603 }
604
605 Ok(())
609 }
610
611 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
613 let by_room_guard = self.by_room.read().await;
616
617 match by_room_guard.get(room_id) {
618 Some(room) => Ok(room.clone()),
619
620 None => {
621 drop(by_room_guard);
623 let mut by_room_guard = self.by_room.write().await;
624
625 if let Some(room) = by_room_guard.get(room_id) {
628 return Ok(room.clone());
629 }
630
631 let pagination_status =
632 SharedObservable::new(PaginationStatus::Idle { hit_timeline_start: false });
633
634 let Some(client) = self.client.get() else {
635 return Err(EventCacheError::ClientDropped);
636 };
637
638 let room = client
639 .get_room(room_id)
640 .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
641 let room_version_rules = room.clone_info().room_version_rules_or_default();
642
643 let enabled_thread_support = matches!(
644 client.base_client().threading_support,
645 ThreadingSupport::Enabled { .. }
646 );
647
648 let update_sender = caches::room::RoomEventCacheUpdateSender::new(
649 self.generic_update_sender.clone(),
650 );
651
652 let own_user_id =
653 client.user_id().expect("the user must be logged in, at this point").to_owned();
654 let room_state = RoomEventCacheStateLock::new(
655 own_user_id,
656 room_id.to_owned(),
657 room_version_rules,
658 enabled_thread_support,
659 update_sender.clone(),
660 self.linked_chunk_update_sender.clone(),
661 self.store.clone(),
662 pagination_status.clone(),
663 )
664 .await?;
665
666 let timeline_is_not_empty =
667 room_state.read().await?.room_linked_chunk().revents().next().is_some();
668
669 let auto_shrink_sender =
672 self.auto_shrink_sender.get().cloned().expect(
673 "we must have called `EventCache::subscribe()` before calling here.",
674 );
675
676 let room_event_cache = RoomEventCache::new(
677 self.client.clone(),
678 room_state,
679 pagination_status,
680 room_id.to_owned(),
681 auto_shrink_sender,
682 update_sender,
683 );
684
685 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
686
687 if timeline_is_not_empty {
690 let _ = self
691 .generic_update_sender
692 .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
693 }
694
695 Ok(room_event_cache)
696 }
697 }
698 }
699}
700
701#[derive(Debug, Clone)]
703pub enum EventsOrigin {
704 Sync,
706
707 Pagination,
709
710 Cache,
712}
713
714#[cfg(test)]
715mod tests {
716 use std::{ops::Not, sync::Arc, time::Duration};
717
718 use assert_matches::assert_matches;
719 use futures_util::FutureExt as _;
720 use matrix_sdk_base::{
721 RoomState,
722 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
723 sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
724 };
725 use matrix_sdk_test::{
726 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
727 };
728 use ruma::{event_id, room_id, serde::Raw, user_id};
729 use serde_json::json;
730 use tokio::time::sleep;
731
732 use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
733 use crate::test_utils::{
734 assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
735 };
736
737 #[async_test]
738 async fn test_must_explicitly_subscribe() {
739 let client = logged_in_client(None).await;
740
741 let event_cache = client.event_cache();
742
743 let room_id = room_id!("!omelette:fromage.fr");
746 let result = event_cache.for_room(room_id).await;
747
748 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
751 }
752
753 #[async_test]
754 async fn test_uniq_read_marker() {
755 let client = logged_in_client(None).await;
756 let room_id = room_id!("!galette:saucisse.bzh");
757 client.base_client().get_or_create_room(room_id, RoomState::Joined);
758
759 let event_cache = client.event_cache();
760
761 event_cache.subscribe().unwrap();
762
763 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
764 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
765 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
766
767 assert!(events.is_empty());
768
769 let read_marker_event = Raw::from_json_string(
771 json!({
772 "content": {
773 "event_id": "$crepe:saucisse.bzh"
774 },
775 "room_id": "!galette:saucisse.bzh",
776 "type": "m.fully_read"
777 })
778 .to_string(),
779 )
780 .unwrap();
781 let account_data = vec![read_marker_event; 100];
782
783 room_event_cache
784 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
785 .await
786 .unwrap();
787
788 assert_matches!(
790 stream.recv().await.unwrap(),
791 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
792 );
793
794 assert!(stream.recv().now_or_never().is_none());
795
796 assert!(generic_stream.recv().now_or_never().is_none());
798 }
799
800 #[async_test]
801 async fn test_get_event_by_id() {
802 let client = logged_in_client(None).await;
803 let room_id1 = room_id!("!galette:saucisse.bzh");
804 let room_id2 = room_id!("!crepe:saucisse.bzh");
805
806 client.base_client().get_or_create_room(room_id1, RoomState::Joined);
807 client.base_client().get_or_create_room(room_id2, RoomState::Joined);
808
809 let event_cache = client.event_cache();
810 event_cache.subscribe().unwrap();
811
812 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
814
815 let eid1 = event_id!("$1");
816 let eid2 = event_id!("$2");
817 let eid3 = event_id!("$3");
818
819 let joined_room_update1 = JoinedRoomUpdate {
820 timeline: Timeline {
821 events: vec![
822 f.text_msg("hey").event_id(eid1).into(),
823 f.text_msg("you").event_id(eid2).into(),
824 ],
825 ..Default::default()
826 },
827 ..Default::default()
828 };
829
830 let joined_room_update2 = JoinedRoomUpdate {
831 timeline: Timeline {
832 events: vec![f.text_msg("bjr").event_id(eid3).into()],
833 ..Default::default()
834 },
835 ..Default::default()
836 };
837
838 let mut updates = RoomUpdates::default();
839 updates.joined.insert(room_id1.to_owned(), joined_room_update1);
840 updates.joined.insert(room_id2.to_owned(), joined_room_update2);
841
842 event_cache.inner.handle_room_updates(updates).await.unwrap();
844
845 let room1 = client.get_room(room_id1).unwrap();
847
848 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
849
850 let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
851 assert_event_matches_msg(&found1, "hey");
852
853 let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
854 assert_event_matches_msg(&found2, "you");
855
856 assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
859 }
860
861 #[async_test]
862 async fn test_save_event() {
863 let client = logged_in_client(None).await;
864 let room_id = room_id!("!galette:saucisse.bzh");
865
866 let event_cache = client.event_cache();
867 event_cache.subscribe().unwrap();
868
869 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
870 let event_id = event_id!("$1");
871
872 client.base_client().get_or_create_room(room_id, RoomState::Joined);
873 let room = client.get_room(room_id).unwrap();
874
875 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
876 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
877
878 assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
880 }
881
882 #[async_test]
883 async fn test_generic_update_when_loading_rooms() {
884 let user = user_id!("@mnt_io:matrix.org");
886 let client = logged_in_client(None).await;
887 let room_id_0 = room_id!("!raclette:patate.ch");
888 let room_id_1 = room_id!("!fondue:patate.ch");
889
890 let event_factory = EventFactory::new().room(room_id_0).sender(user);
891
892 let event_cache = client.event_cache();
893 event_cache.subscribe().unwrap();
894
895 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
896 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
897
898 client
899 .event_cache_store()
900 .lock()
901 .await
902 .expect("Could not acquire the event cache lock")
903 .as_clean()
904 .expect("Could not acquire a clean event cache lock")
905 .handle_linked_chunk_updates(
906 LinkedChunkId::Room(room_id_0),
907 vec![
908 Update::NewItemsChunk {
910 previous: None,
911 new: ChunkIdentifier::new(0),
912 next: None,
913 },
914 Update::PushItems {
915 at: Position::new(ChunkIdentifier::new(0), 0),
916 items: vec![
917 event_factory
918 .text_msg("hello")
919 .sender(user)
920 .event_id(event_id!("$ev0"))
921 .into_event(),
922 ],
923 },
924 ],
925 )
926 .await
927 .unwrap();
928
929 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
930
931 {
933 let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
934
935 assert_matches!(
936 generic_stream.recv().await,
937 Ok(RoomEventCacheGenericUpdate { room_id }) => {
938 assert_eq!(room_id, room_id_0);
939 }
940 );
941 }
942
943 {
945 let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
946
947 assert!(generic_stream.recv().now_or_never().is_none());
948 }
949 }
950
951 #[async_test]
952 async fn test_generic_update_when_paginating_room() {
953 let user = user_id!("@mnt_io:matrix.org");
955 let client = logged_in_client(None).await;
956 let room_id = room_id!("!raclette:patate.ch");
957
958 let event_factory = EventFactory::new().room(room_id).sender(user);
959
960 let event_cache = client.event_cache();
961 event_cache.subscribe().unwrap();
962
963 client.base_client().get_or_create_room(room_id, RoomState::Joined);
964
965 client
966 .event_cache_store()
967 .lock()
968 .await
969 .expect("Could not acquire the event cache lock")
970 .as_clean()
971 .expect("Could not acquire a clean event cache lock")
972 .handle_linked_chunk_updates(
973 LinkedChunkId::Room(room_id),
974 vec![
975 Update::NewItemsChunk {
977 previous: None,
978 new: ChunkIdentifier::new(0),
979 next: None,
980 },
981 Update::NewItemsChunk {
983 previous: Some(ChunkIdentifier::new(0)),
984 new: ChunkIdentifier::new(1),
985 next: None,
986 },
987 Update::NewItemsChunk {
989 previous: Some(ChunkIdentifier::new(1)),
990 new: ChunkIdentifier::new(2),
991 next: None,
992 },
993 Update::PushItems {
994 at: Position::new(ChunkIdentifier::new(2), 0),
995 items: vec![
996 event_factory
997 .text_msg("hello")
998 .sender(user)
999 .event_id(event_id!("$ev0"))
1000 .into_event(),
1001 ],
1002 },
1003 Update::NewItemsChunk {
1005 previous: Some(ChunkIdentifier::new(2)),
1006 new: ChunkIdentifier::new(3),
1007 next: None,
1008 },
1009 Update::PushItems {
1010 at: Position::new(ChunkIdentifier::new(3), 0),
1011 items: vec![
1012 event_factory
1013 .text_msg("world")
1014 .sender(user)
1015 .event_id(event_id!("$ev1"))
1016 .into_event(),
1017 ],
1018 },
1019 ],
1020 )
1021 .await
1022 .unwrap();
1023
1024 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1025
1026 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1028
1029 assert_matches!(
1030 generic_stream.recv().await,
1031 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1032 assert_eq!(room_id, expected_room_id);
1033 }
1034 );
1035
1036 let pagination = room_event_cache.pagination();
1037
1038 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1040
1041 assert_eq!(pagination_outcome.events.len(), 1);
1042 assert!(pagination_outcome.reached_start.not());
1043 assert_matches!(
1044 generic_stream.recv().await,
1045 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1046 assert_eq!(room_id, expected_room_id);
1047 }
1048 );
1049
1050 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1052
1053 assert!(pagination_outcome.events.is_empty());
1054 assert!(pagination_outcome.reached_start.not());
1055 assert!(generic_stream.recv().now_or_never().is_none());
1056
1057 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1059
1060 assert!(pagination_outcome.reached_start);
1061 assert!(generic_stream.recv().now_or_never().is_none());
1062 }
1063
1064 #[async_test]
1065 async fn test_for_room_when_room_is_not_found() {
1066 let client = logged_in_client(None).await;
1067 let room_id = room_id!("!raclette:patate.ch");
1068
1069 let event_cache = client.event_cache();
1070 event_cache.subscribe().unwrap();
1071
1072 assert_matches!(
1074 event_cache.for_room(room_id).await,
1075 Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1076 assert_eq!(room_id, not_found_room_id);
1077 }
1078 );
1079
1080 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1082
1083 assert!(event_cache.for_room(room_id).await.is_ok());
1085 }
1086
1087 #[cfg(not(target_family = "wasm"))]
1090 #[async_test]
1091 async fn test_no_refcycle_event_cache_tasks() {
1092 let client = MockClientBuilder::new(None).build().await;
1093
1094 sleep(Duration::from_secs(1)).await;
1096
1097 let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1098 assert_eq!(event_cache_weak.strong_count(), 1);
1099
1100 {
1101 let room_id = room_id!("!room:example.org");
1102
1103 let response = SyncResponseBuilder::default()
1105 .add_joined_room(JoinedRoomBuilder::new(room_id))
1106 .build_sync_response();
1107 client.inner.base_client.receive_sync_response(response).await.unwrap();
1108
1109 client.event_cache().subscribe().unwrap();
1110
1111 let (_room_event_cache, _drop_handles) =
1112 client.get_room(room_id).unwrap().event_cache().await.unwrap();
1113 }
1114
1115 drop(client);
1116
1117 sleep(Duration::from_secs(1)).await;
1119
1120 assert_eq!(
1122 event_cache_weak.strong_count(),
1123 0,
1124 "Too many strong references to the event cache {}",
1125 event_cache_weak.strong_count()
1126 );
1127 }
1128}