1#![forbid(missing_docs)]
29
30use std::{
31 collections::{BTreeMap, HashMap},
32 fmt,
33 sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40 ThreadingSupport,
41 cross_process_lock::CrossProcessLockError,
42 deserialized_responses::{AmbiguityChange, TimelineEvent},
43 event_cache::{
44 Gap,
45 store::{EventCacheStoreError, EventCacheStoreLock},
46 },
47 executor::AbortOnDrop,
48 linked_chunk::{self, OwnedLinkedChunkId, lazy_loader::LazyLoaderError},
49 serde_helpers::extract_thread_root_from_content,
50 sync::RoomUpdates,
51 timer,
52};
53use matrix_sdk_common::executor::{JoinHandle, spawn};
54use room::RoomEventCacheState;
55use ruma::{
56 OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, events::AnySyncEphemeralRoomEvent,
57 serde::Raw,
58};
59use tokio::{
60 select,
61 sync::{
62 Mutex, RwLock,
63 broadcast::{Receiver, Sender, channel, error::RecvError},
64 mpsc,
65 },
66};
67use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
68
69use crate::{
70 Client,
71 client::WeakClient,
72 send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
73};
74
75mod deduplicator;
76mod pagination;
77mod room;
78
79pub use pagination::{RoomPagination, RoomPaginationStatus};
80pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
81
82#[derive(thiserror::Error, Debug)]
84pub enum EventCacheError {
85 #[error(
88 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
89 )]
90 NotSubscribedYet,
91
92 #[error("Room `{room_id}` is not found.")]
94 RoomNotFound {
95 room_id: OwnedRoomId,
97 },
98
99 #[error(transparent)]
101 BackpaginationError(Box<crate::Error>),
102
103 #[error("We were already back-paginating.")]
106 AlreadyBackpaginating,
107
108 #[error(transparent)]
110 Storage(#[from] EventCacheStoreError),
111
112 #[error(transparent)]
114 LockingStorage(#[from] CrossProcessLockError),
115
116 #[error("The owning client of the event cache has been dropped.")]
120 ClientDropped,
121
122 #[error(transparent)]
127 LinkedChunkLoader(#[from] LazyLoaderError),
128
129 #[error("the linked chunk metadata is invalid: {details}")]
132 InvalidLinkedChunkMetadata {
133 details: String,
135 },
136}
137
138pub type Result<T> = std::result::Result<T, EventCacheError>;
140
141pub struct EventCacheDropHandles {
143 listen_updates_task: JoinHandle<()>,
145
146 ignore_user_list_update_task: JoinHandle<()>,
148
149 auto_shrink_linked_chunk_task: JoinHandle<()>,
151}
152
153impl fmt::Debug for EventCacheDropHandles {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
156 }
157}
158
159impl Drop for EventCacheDropHandles {
160 fn drop(&mut self) {
161 self.listen_updates_task.abort();
162 self.ignore_user_list_update_task.abort();
163 self.auto_shrink_linked_chunk_task.abort();
164 }
165}
166
167#[derive(Clone)]
173pub struct EventCache {
174 inner: Arc<EventCacheInner>,
176}
177
178impl fmt::Debug for EventCache {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 f.debug_struct("EventCache").finish_non_exhaustive()
181 }
182}
183
184impl EventCache {
185 pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
187 let (generic_update_sender, _) = channel(32);
188 let (linked_chunk_update_sender, _) = channel(32);
189
190 let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
191 let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
192 client.clone(),
193 linked_chunk_update_sender.clone(),
194 thread_subscriber_sender,
195 )));
196
197 #[cfg(feature = "experimental-search")]
198 let search_indexing_task = AbortOnDrop::new(spawn(Self::search_indexing_task(
199 client.clone(),
200 linked_chunk_update_sender.clone(),
201 )));
202
203 Self {
204 inner: Arc::new(EventCacheInner {
205 client,
206 store: event_cache_store,
207 multiple_room_updates_lock: Default::default(),
208 by_room: Default::default(),
209 drop_handles: Default::default(),
210 auto_shrink_sender: Default::default(),
211 generic_update_sender,
212 linked_chunk_update_sender,
213 _thread_subscriber_task: thread_subscriber_task,
214 #[cfg(feature = "experimental-search")]
215 _search_indexing_task: search_indexing_task,
216 thread_subscriber_receiver,
217 }),
218 }
219 }
220
221 #[doc(hidden)]
225 pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
226 self.inner.thread_subscriber_receiver.resubscribe()
227 }
228
229 pub fn subscribe(&self) -> Result<()> {
235 let client = self.inner.client()?;
236
237 let _ = self.inner.drop_handles.get_or_init(|| {
239 let listen_updates_task = spawn(Self::listen_task(
241 self.inner.clone(),
242 client.subscribe_to_all_room_updates(),
243 ));
244
245 let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
246 self.inner.clone(),
247 client.subscribe_to_ignore_user_list_changes(),
248 ));
249
250 let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
251
252 self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
254
255 let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task(
256 Arc::downgrade(&self.inner),
257 auto_shrink_receiver,
258 ));
259
260 Arc::new(EventCacheDropHandles {
261 listen_updates_task,
262 ignore_user_list_update_task,
263 auto_shrink_linked_chunk_task,
264 })
265 });
266
267 Ok(())
268 }
269
270 #[instrument(skip_all)]
271 async fn ignore_user_list_update_task(
272 inner: Arc<EventCacheInner>,
273 mut ignore_user_list_stream: Subscriber<Vec<String>>,
274 ) {
275 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
276 span.follows_from(Span::current());
277
278 async move {
279 while ignore_user_list_stream.next().await.is_some() {
280 info!("Received an ignore user list change");
281 if let Err(err) = inner.clear_all_rooms().await {
282 error!("when clearing room storage after ignore user list change: {err}");
283 }
284 }
285 info!("Ignore user list stream has closed");
286 }
287 .instrument(span)
288 .await;
289 }
290
291 #[doc(hidden)]
293 pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
294 self.inner.handle_room_updates(updates).await
295 }
296
297 #[instrument(skip_all)]
298 async fn listen_task(
299 inner: Arc<EventCacheInner>,
300 mut room_updates_feed: Receiver<RoomUpdates>,
301 ) {
302 trace!("Spawning the listen task");
303 loop {
304 match room_updates_feed.recv().await {
305 Ok(updates) => {
306 trace!("Receiving `RoomUpdates`");
307
308 if let Err(err) = inner.handle_room_updates(updates).await {
309 match err {
310 EventCacheError::ClientDropped => {
311 info!(
313 "Closing the event cache global listen task because client dropped"
314 );
315 break;
316 }
317 err => {
318 error!("Error when handling room updates: {err}");
319 }
320 }
321 }
322 }
323
324 Err(RecvError::Lagged(num_skipped)) => {
325 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
329 if let Err(err) = inner.clear_all_rooms().await {
330 error!("when clearing storage after lag in listen_task: {err}");
331 }
332 }
333
334 Err(RecvError::Closed) => {
335 info!("Closing the event cache global listen task because receiver closed");
337 break;
338 }
339 }
340 }
341 }
342
343 #[instrument(skip_all)]
357 async fn auto_shrink_linked_chunk_task(
358 inner: Weak<EventCacheInner>,
359 mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
360 ) {
361 while let Some(room_id) = rx.recv().await {
362 trace!(for_room = %room_id, "received notification to shrink");
363
364 let Some(inner) = inner.upgrade() else {
365 return;
366 };
367
368 let room = match inner.for_room(&room_id).await {
369 Ok(room) => room,
370 Err(err) => {
371 warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
372 continue;
373 }
374 };
375
376 trace!("waiting for state lock…");
377 let mut state = room.inner.state.write().await;
378
379 match state.auto_shrink_if_no_subscribers().await {
380 Ok(diffs) => {
381 if let Some(diffs) = diffs {
382 if !diffs.is_empty() {
389 let _ = room.inner.sender.send(
390 RoomEventCacheUpdate::UpdateTimelineEvents {
391 diffs,
392 origin: EventsOrigin::Cache,
393 },
394 );
395 }
396 } else {
397 debug!("auto-shrinking didn't happen");
398 }
399 }
400
401 Err(err) => {
402 warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
404 }
405 }
406 }
407
408 info!("Auto-shrink linked chunk task has been closed, exiting");
409 }
410
411 pub(crate) async fn for_room(
413 &self,
414 room_id: &RoomId,
415 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
416 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
417 return Err(EventCacheError::NotSubscribedYet);
418 };
419
420 let room = self.inner.for_room(room_id).await?;
421
422 Ok((room, drop_handles))
423 }
424
425 pub async fn clear_all_rooms(&self) -> Result<()> {
429 self.inner.clear_all_rooms().await
430 }
431
432 pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
442 self.inner.generic_update_sender.subscribe()
443 }
444
445 #[instrument(skip(client, thread_subscriber_sender))]
452 async fn handle_thread_subscriber_linked_chunk_update(
453 client: &WeakClient,
454 thread_subscriber_sender: &Sender<()>,
455 up: RoomEventCacheLinkedChunkUpdate,
456 ) -> bool {
457 let Some(client) = client.get() else {
458 debug!("Client is shutting down, exiting thread subscriber task");
460 return false;
461 };
462
463 let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
464 trace!("received an update for a non-thread linked chunk, ignoring");
465 return true;
466 };
467
468 let Some(room) = client.get_room(room_id) else {
469 warn!(%room_id, "unknown room");
470 return true;
471 };
472
473 let thread_root = thread_root.clone();
474
475 let mut new_events = up.events().peekable();
476
477 if new_events.peek().is_none() {
478 return true;
480 }
481
482 let with_thread_subscriptions = false;
491
492 let Some(push_context) = room
493 .push_context_internal(with_thread_subscriptions)
494 .await
495 .inspect_err(|err| {
496 warn!("Failed to get push context for threads: {err}");
497 })
498 .ok()
499 .flatten()
500 else {
501 warn!("Missing push context for thread subscriptions.");
502 return true;
503 };
504
505 let mut subscribe_up_to = None;
506
507 for ev in new_events.rev() {
511 if push_context
512 .for_event(ev.raw())
513 .await
514 .into_iter()
515 .any(|action| action.should_notify())
516 {
517 let Some(event_id) = ev.event_id() else {
518 continue;
520 };
521 subscribe_up_to = Some(event_id);
522 break;
523 }
524 }
525
526 if let Some(event_id) = subscribe_up_to {
529 trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
530 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
531 warn!(%err, "Failed to subscribe to thread");
532 } else {
533 let _ = thread_subscriber_sender.send(());
534 }
535 }
536
537 true
538 }
539
540 #[instrument(skip(client, thread_subscriber_sender))]
547 async fn handle_thread_subscriber_send_queue_update(
548 client: &WeakClient,
549 thread_subscriber_sender: &Sender<()>,
550 events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
551 up: SendQueueUpdate,
552 ) -> bool {
553 let Some(client) = client.get() else {
554 debug!("Client is shutting down, exiting thread subscriber task");
556 return false;
557 };
558
559 let room_id = up.room_id;
560 let Some(room) = client.get_room(&room_id) else {
561 warn!(%room_id, "unknown room");
562 return true;
563 };
564
565 let (thread_root, subscribe_up_to) = match up.update {
566 RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
567 match local_echo.content {
568 LocalEchoContent::Event { serialized_event, .. } => {
569 if let Some(thread_root) =
570 extract_thread_root_from_content(serialized_event.into_raw().0)
571 {
572 events_being_sent.insert(local_echo.transaction_id, thread_root);
573 }
574 }
575 LocalEchoContent::React { .. } => {
576 }
579 }
580 return true;
581 }
582
583 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
584 events_being_sent.remove(&transaction_id);
585 return true;
586 }
587
588 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
589 if let Some(thread_root) =
590 extract_thread_root_from_content(new_content.into_raw().0)
591 {
592 events_being_sent.insert(transaction_id, thread_root);
593 } else {
594 events_being_sent.remove(&transaction_id);
597 }
598 return true;
599 }
600
601 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
602 if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
603 (thread_root, event_id)
604 } else {
605 trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
607 return true;
608 }
609 }
610
611 RoomSendQueueUpdate::SendError { .. }
612 | RoomSendQueueUpdate::RetryEvent { .. }
613 | RoomSendQueueUpdate::MediaUpload { .. } => {
614 return true;
616 }
617 };
618
619 trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
621 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
622 {
623 warn!(%err, "Failed to subscribe to thread");
624 } else {
625 let _ = thread_subscriber_sender.send(());
626 }
627
628 true
629 }
630
631 #[instrument(skip_all)]
632 async fn thread_subscriber_task(
633 client: WeakClient,
634 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
635 thread_subscriber_sender: Sender<()>,
636 ) {
637 let mut send_q_rx = if let Some(client) = client.get() {
638 if !client.enabled_thread_subscriptions() {
639 trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
640 return;
641 }
642
643 client.send_queue().subscribe()
644 } else {
645 trace!("Client is shutting down, not spawning thread subscriber task");
646 return;
647 };
648
649 let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
650
651 let mut events_being_sent = HashMap::new();
656
657 loop {
658 select! {
659 res = send_q_rx.recv() => {
660 match res {
661 Ok(up) => {
662 if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
663 break;
664 }
665 }
666 Err(RecvError::Closed) => {
667 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
668 break;
669 }
670 Err(RecvError::Lagged(num_skipped)) => {
671 warn!(num_skipped, "Lagged behind linked chunk updates");
672 }
673 }
674 }
675
676 res = linked_chunk_rx.recv() => {
677 match res {
678 Ok(up) => {
679 if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
680 break;
681 }
682 }
683 Err(RecvError::Closed) => {
684 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
685 break;
686 }
687 Err(RecvError::Lagged(num_skipped)) => {
688 warn!(num_skipped, "Lagged behind linked chunk updates");
689 }
690 }
691 }
692 }
693 }
694 }
695
696 #[cfg(feature = "experimental-search")]
700 #[instrument(skip_all)]
701 async fn search_indexing_task(
702 client: WeakClient,
703 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
704 ) {
705 let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
706
707 loop {
708 match linked_chunk_update_receiver.recv().await {
709 Ok(room_ec_lc_update) => {
710 let OwnedLinkedChunkId::Room(room_id) =
711 room_ec_lc_update.linked_chunk_id.clone()
712 else {
713 trace!("Received non-room updates, ignoring.");
714 continue;
715 };
716
717 let mut timeline_events = room_ec_lc_update.events().peekable();
718
719 if timeline_events.peek().is_none() {
720 continue;
721 }
722
723 let Some(client) = client.get() else {
724 trace!("Client is shutting down, not spawning thread subscriber task");
725 return;
726 };
727
728 let maybe_room_cache = client.event_cache().for_room(&room_id).await;
729 let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
730 warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
731 continue;
732 };
733
734 let maybe_room = client.get_room(&room_id);
735 let Some(room) = maybe_room else {
736 warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
737 continue;
738 };
739 let redaction_rules =
740 room.clone_info().room_version_rules_or_default().redaction;
741
742 let mut search_index_guard = client.search_index().lock().await;
743
744 if let Err(err) = search_index_guard
745 .bulk_handle_timeline_event(
746 timeline_events,
747 &room_cache,
748 &room_id,
749 &redaction_rules,
750 )
751 .await
752 {
753 error!("Failed to handle events for indexing: {err}")
754 }
755 }
756 Err(RecvError::Closed) => {
757 debug!(
758 "Linked chunk update channel has been closed, exiting thread subscriber task"
759 );
760 break;
761 }
762 Err(RecvError::Lagged(num_skipped)) => {
763 warn!(num_skipped, "Lagged behind linked chunk updates");
764 }
765 }
766 }
767 }
768}
769
770struct EventCacheInner {
771 client: WeakClient,
774
775 store: EventCacheStoreLock,
777
778 multiple_room_updates_lock: Mutex<()>,
785
786 by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
788
789 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
791
792 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
799
800 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
805
806 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
814
815 _thread_subscriber_task: AbortOnDrop<()>,
822
823 #[cfg(feature = "experimental-search")]
830 _search_indexing_task: AbortOnDrop<()>,
831
832 thread_subscriber_receiver: Receiver<()>,
838}
839
840type AutoShrinkChannelPayload = OwnedRoomId;
841
842impl EventCacheInner {
843 fn client(&self) -> Result<Client> {
844 self.client.get().ok_or(EventCacheError::ClientDropped)
845 }
846
847 async fn clear_all_rooms(&self) -> Result<()> {
849 let rooms = self.by_room.write().await;
889
890 let room_locks = join_all(
893 rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
894 )
895 .await;
896
897 self.store.lock().await?.clear_all_linked_chunks().await?;
899
900 try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
904 let updates_as_vector_diffs = state_guard.reset().await?;
905
906 let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
907 diffs: updates_as_vector_diffs,
908 origin: EventsOrigin::Cache,
909 });
910
911 let _ = room
912 .inner
913 .generic_update_sender
914 .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
915
916 Ok::<_, EventCacheError>(())
917 }))
918 .await?;
919
920 Ok(())
921 }
922
923 #[instrument(skip(self, updates))]
925 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
926 let _lock = {
929 let _timer = timer!("Taking the `multiple_room_updates_lock`");
930 self.multiple_room_updates_lock.lock().await
931 };
932
933 for (room_id, left_room_update) in updates.left {
940 let room = self.for_room(&room_id).await?;
941
942 if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
943 error!("handling left room update: {err}");
945 }
946 }
947
948 for (room_id, joined_room_update) in updates.joined {
950 trace!(?room_id, "Handling a `JoinedRoomUpdate`");
951
952 let room = self.for_room(&room_id).await?;
953
954 if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
955 error!(%room_id, "handling joined room update: {err}");
957 }
958 }
959
960 Ok(())
964 }
965
966 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
968 let by_room_guard = self.by_room.read().await;
971
972 match by_room_guard.get(room_id) {
973 Some(room) => Ok(room.clone()),
974
975 None => {
976 drop(by_room_guard);
978 let mut by_room_guard = self.by_room.write().await;
979
980 if let Some(room) = by_room_guard.get(room_id) {
983 return Ok(room.clone());
984 }
985
986 let pagination_status =
987 SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
988
989 let Some(client) = self.client.get() else {
990 return Err(EventCacheError::ClientDropped);
991 };
992
993 let room = client
994 .get_room(room_id)
995 .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
996 let room_version_rules = room.clone_info().room_version_rules_or_default();
997
998 let enabled_thread_support = matches!(
999 client.base_client().threading_support,
1000 ThreadingSupport::Enabled { .. }
1001 );
1002
1003 let room_state = RoomEventCacheState::new(
1004 room_id.to_owned(),
1005 room_version_rules,
1006 enabled_thread_support,
1007 self.linked_chunk_update_sender.clone(),
1008 self.store.clone(),
1009 pagination_status.clone(),
1010 )
1011 .await?;
1012
1013 let timeline_is_not_empty =
1014 room_state.room_linked_chunk().revents().next().is_some();
1015
1016 let auto_shrink_sender =
1019 self.auto_shrink_sender.get().cloned().expect(
1020 "we must have called `EventCache::subscribe()` before calling here.",
1021 );
1022
1023 let room_event_cache = RoomEventCache::new(
1024 self.client.clone(),
1025 room_state,
1026 pagination_status,
1027 room_id.to_owned(),
1028 auto_shrink_sender,
1029 self.generic_update_sender.clone(),
1030 );
1031
1032 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1033
1034 if timeline_is_not_empty {
1037 let _ = self
1038 .generic_update_sender
1039 .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1040 }
1041
1042 Ok(room_event_cache)
1043 }
1044 }
1045 }
1046}
1047
1048#[derive(Debug)]
1050pub struct BackPaginationOutcome {
1051 pub reached_start: bool,
1053
1054 pub events: Vec<TimelineEvent>,
1061}
1062
1063#[derive(Clone, Debug)]
1069pub struct RoomEventCacheGenericUpdate {
1070 pub room_id: OwnedRoomId,
1072}
1073
1074#[derive(Clone, Debug)]
1077struct RoomEventCacheLinkedChunkUpdate {
1078 linked_chunk_id: OwnedLinkedChunkId,
1080
1081 updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1084}
1085
1086impl RoomEventCacheLinkedChunkUpdate {
1087 pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1090 use itertools::Either;
1091 self.updates.into_iter().flat_map(|update| match update {
1092 linked_chunk::Update::PushItems { items, .. } => {
1093 Either::Left(Either::Left(items.into_iter()))
1094 }
1095 linked_chunk::Update::ReplaceItem { item, .. } => {
1096 Either::Left(Either::Right(std::iter::once(item)))
1097 }
1098 linked_chunk::Update::RemoveItem { .. }
1099 | linked_chunk::Update::DetachLastItems { .. }
1100 | linked_chunk::Update::StartReattachItems
1101 | linked_chunk::Update::EndReattachItems
1102 | linked_chunk::Update::NewItemsChunk { .. }
1103 | linked_chunk::Update::NewGapChunk { .. }
1104 | linked_chunk::Update::RemoveChunk(..)
1105 | linked_chunk::Update::Clear => {
1106 Either::Right(std::iter::empty())
1108 }
1109 })
1110 }
1111}
1112
1113#[derive(Debug, Clone)]
1115pub enum RoomEventCacheUpdate {
1116 MoveReadMarkerTo {
1118 event_id: OwnedEventId,
1120 },
1121
1122 UpdateMembers {
1124 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1129 },
1130
1131 UpdateTimelineEvents {
1133 diffs: Vec<VectorDiff<TimelineEvent>>,
1135
1136 origin: EventsOrigin,
1138 },
1139
1140 AddEphemeralEvents {
1142 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1145 },
1146}
1147
1148#[derive(Debug, Clone)]
1150pub enum EventsOrigin {
1151 Sync,
1153
1154 Pagination,
1156
1157 Cache,
1159}
1160
1161#[cfg(test)]
1162mod tests {
1163 use std::{ops::Not, sync::Arc, time::Duration};
1164
1165 use assert_matches::assert_matches;
1166 use futures_util::FutureExt as _;
1167 use matrix_sdk_base::{
1168 RoomState,
1169 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1170 sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1171 };
1172 use matrix_sdk_test::{
1173 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
1174 };
1175 use ruma::{event_id, room_id, serde::Raw, user_id};
1176 use serde_json::json;
1177 use tokio::time::sleep;
1178
1179 use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1180 use crate::test_utils::{
1181 assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1182 };
1183
1184 #[async_test]
1185 async fn test_must_explicitly_subscribe() {
1186 let client = logged_in_client(None).await;
1187
1188 let event_cache = client.event_cache();
1189
1190 let room_id = room_id!("!omelette:fromage.fr");
1193 let result = event_cache.for_room(room_id).await;
1194
1195 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1198 }
1199
1200 #[async_test]
1201 async fn test_uniq_read_marker() {
1202 let client = logged_in_client(None).await;
1203 let room_id = room_id!("!galette:saucisse.bzh");
1204 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1205
1206 let event_cache = client.event_cache();
1207
1208 event_cache.subscribe().unwrap();
1209
1210 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1211 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1212 let (events, mut stream) = room_event_cache.subscribe().await;
1213
1214 assert!(events.is_empty());
1215
1216 let read_marker_event = Raw::from_json_string(
1218 json!({
1219 "content": {
1220 "event_id": "$crepe:saucisse.bzh"
1221 },
1222 "room_id": "!galette:saucisse.bzh",
1223 "type": "m.fully_read"
1224 })
1225 .to_string(),
1226 )
1227 .unwrap();
1228 let account_data = vec![read_marker_event; 100];
1229
1230 room_event_cache
1231 .inner
1232 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1233 .await
1234 .unwrap();
1235
1236 assert_matches!(
1238 stream.recv().await.unwrap(),
1239 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1240 );
1241
1242 assert!(stream.recv().now_or_never().is_none());
1243
1244 assert!(generic_stream.recv().now_or_never().is_none());
1246 }
1247
1248 #[async_test]
1249 async fn test_get_event_by_id() {
1250 let client = logged_in_client(None).await;
1251 let room_id1 = room_id!("!galette:saucisse.bzh");
1252 let room_id2 = room_id!("!crepe:saucisse.bzh");
1253
1254 client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1255 client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1256
1257 let event_cache = client.event_cache();
1258 event_cache.subscribe().unwrap();
1259
1260 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1262
1263 let eid1 = event_id!("$1");
1264 let eid2 = event_id!("$2");
1265 let eid3 = event_id!("$3");
1266
1267 let joined_room_update1 = JoinedRoomUpdate {
1268 timeline: Timeline {
1269 events: vec![
1270 f.text_msg("hey").event_id(eid1).into(),
1271 f.text_msg("you").event_id(eid2).into(),
1272 ],
1273 ..Default::default()
1274 },
1275 ..Default::default()
1276 };
1277
1278 let joined_room_update2 = JoinedRoomUpdate {
1279 timeline: Timeline {
1280 events: vec![f.text_msg("bjr").event_id(eid3).into()],
1281 ..Default::default()
1282 },
1283 ..Default::default()
1284 };
1285
1286 let mut updates = RoomUpdates::default();
1287 updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1288 updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1289
1290 event_cache.inner.handle_room_updates(updates).await.unwrap();
1292
1293 let room1 = client.get_room(room_id1).unwrap();
1295
1296 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1297
1298 let found1 = room_event_cache.find_event(eid1).await.unwrap();
1299 assert_event_matches_msg(&found1, "hey");
1300
1301 let found2 = room_event_cache.find_event(eid2).await.unwrap();
1302 assert_event_matches_msg(&found2, "you");
1303
1304 assert!(room_event_cache.find_event(eid3).await.is_none());
1307 }
1308
1309 #[async_test]
1310 async fn test_save_event() {
1311 let client = logged_in_client(None).await;
1312 let room_id = room_id!("!galette:saucisse.bzh");
1313
1314 let event_cache = client.event_cache();
1315 event_cache.subscribe().unwrap();
1316
1317 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1318 let event_id = event_id!("$1");
1319
1320 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1321 let room = client.get_room(room_id).unwrap();
1322
1323 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1324 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1325
1326 assert!(room_event_cache.find_event(event_id).await.is_some());
1328 }
1329
1330 #[async_test]
1331 async fn test_generic_update_when_loading_rooms() {
1332 let user = user_id!("@mnt_io:matrix.org");
1334 let client = logged_in_client(None).await;
1335 let room_id_0 = room_id!("!raclette:patate.ch");
1336 let room_id_1 = room_id!("!fondue:patate.ch");
1337
1338 let event_factory = EventFactory::new().room(room_id_0).sender(user);
1339
1340 let event_cache = client.event_cache();
1341 event_cache.subscribe().unwrap();
1342
1343 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1344 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1345
1346 client
1347 .event_cache_store()
1348 .lock()
1349 .await
1350 .unwrap()
1351 .handle_linked_chunk_updates(
1352 LinkedChunkId::Room(room_id_0),
1353 vec![
1354 Update::NewItemsChunk {
1356 previous: None,
1357 new: ChunkIdentifier::new(0),
1358 next: None,
1359 },
1360 Update::PushItems {
1361 at: Position::new(ChunkIdentifier::new(0), 0),
1362 items: vec![
1363 event_factory
1364 .text_msg("hello")
1365 .sender(user)
1366 .event_id(event_id!("$ev0"))
1367 .into_event(),
1368 ],
1369 },
1370 ],
1371 )
1372 .await
1373 .unwrap();
1374
1375 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1376
1377 {
1379 let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1380
1381 assert_matches!(
1382 generic_stream.recv().await,
1383 Ok(RoomEventCacheGenericUpdate { room_id }) => {
1384 assert_eq!(room_id, room_id_0);
1385 }
1386 );
1387 }
1388
1389 {
1391 let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1392
1393 assert!(generic_stream.recv().now_or_never().is_none());
1394 }
1395 }
1396
1397 #[async_test]
1398 async fn test_generic_update_when_paginating_room() {
1399 let user = user_id!("@mnt_io:matrix.org");
1401 let client = logged_in_client(None).await;
1402 let room_id = room_id!("!raclette:patate.ch");
1403
1404 let event_factory = EventFactory::new().room(room_id).sender(user);
1405
1406 let event_cache = client.event_cache();
1407 event_cache.subscribe().unwrap();
1408
1409 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1410
1411 client
1412 .event_cache_store()
1413 .lock()
1414 .await
1415 .unwrap()
1416 .handle_linked_chunk_updates(
1417 LinkedChunkId::Room(room_id),
1418 vec![
1419 Update::NewItemsChunk {
1421 previous: None,
1422 new: ChunkIdentifier::new(0),
1423 next: None,
1424 },
1425 Update::NewItemsChunk {
1427 previous: Some(ChunkIdentifier::new(0)),
1428 new: ChunkIdentifier::new(1),
1429 next: None,
1430 },
1431 Update::NewItemsChunk {
1433 previous: Some(ChunkIdentifier::new(1)),
1434 new: ChunkIdentifier::new(2),
1435 next: None,
1436 },
1437 Update::PushItems {
1438 at: Position::new(ChunkIdentifier::new(2), 0),
1439 items: vec![
1440 event_factory
1441 .text_msg("hello")
1442 .sender(user)
1443 .event_id(event_id!("$ev0"))
1444 .into_event(),
1445 ],
1446 },
1447 Update::NewItemsChunk {
1449 previous: Some(ChunkIdentifier::new(2)),
1450 new: ChunkIdentifier::new(3),
1451 next: None,
1452 },
1453 Update::PushItems {
1454 at: Position::new(ChunkIdentifier::new(3), 0),
1455 items: vec![
1456 event_factory
1457 .text_msg("world")
1458 .sender(user)
1459 .event_id(event_id!("$ev1"))
1460 .into_event(),
1461 ],
1462 },
1463 ],
1464 )
1465 .await
1466 .unwrap();
1467
1468 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1469
1470 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1472
1473 assert_matches!(
1474 generic_stream.recv().await,
1475 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1476 assert_eq!(room_id, expected_room_id);
1477 }
1478 );
1479
1480 let pagination = room_event_cache.pagination();
1481
1482 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1484
1485 assert_eq!(pagination_outcome.events.len(), 1);
1486 assert!(pagination_outcome.reached_start.not());
1487 assert_matches!(
1488 generic_stream.recv().await,
1489 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1490 assert_eq!(room_id, expected_room_id);
1491 }
1492 );
1493
1494 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1496
1497 assert!(pagination_outcome.events.is_empty());
1498 assert!(pagination_outcome.reached_start.not());
1499 assert!(generic_stream.recv().now_or_never().is_none());
1500
1501 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1503
1504 assert!(pagination_outcome.reached_start);
1505 assert!(generic_stream.recv().now_or_never().is_none());
1506 }
1507
1508 #[async_test]
1509 async fn test_for_room_when_room_is_not_found() {
1510 let client = logged_in_client(None).await;
1511 let room_id = room_id!("!raclette:patate.ch");
1512
1513 let event_cache = client.event_cache();
1514 event_cache.subscribe().unwrap();
1515
1516 assert_matches!(
1518 event_cache.for_room(room_id).await,
1519 Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1520 assert_eq!(room_id, not_found_room_id);
1521 }
1522 );
1523
1524 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1526
1527 assert!(event_cache.for_room(room_id).await.is_ok());
1529 }
1530
1531 #[cfg(not(target_family = "wasm"))]
1534 #[async_test]
1535 async fn test_no_refcycle_event_cache_tasks() {
1536 let client = MockClientBuilder::new(None).build().await;
1537
1538 sleep(Duration::from_secs(1)).await;
1540
1541 let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1542 assert_eq!(event_cache_weak.strong_count(), 1);
1543
1544 {
1545 let room_id = room_id!("!room:example.org");
1546
1547 let response = SyncResponseBuilder::default()
1549 .add_joined_room(JoinedRoomBuilder::new(room_id))
1550 .build_sync_response();
1551 client.inner.base_client.receive_sync_response(response).await.unwrap();
1552
1553 client.event_cache().subscribe().unwrap();
1554
1555 let (_room_event_cache, _drop_handles) =
1556 client.get_room(room_id).unwrap().event_cache().await.unwrap();
1557 }
1558
1559 drop(client);
1560
1561 sleep(Duration::from_secs(1)).await;
1563
1564 assert_eq!(
1566 event_cache_weak.strong_count(),
1567 0,
1568 "Too many strong references to the event cache {}",
1569 event_cache_weak.strong_count()
1570 );
1571 }
1572}