1#![forbid(missing_docs)]
29
30use std::{
31 collections::{BTreeMap, HashMap},
32 fmt,
33 sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40 ThreadingSupport,
41 cross_process_lock::CrossProcessLockError,
42 deserialized_responses::{AmbiguityChange, TimelineEvent},
43 event_cache::{
44 Gap,
45 store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
46 },
47 executor::AbortOnDrop,
48 linked_chunk::{self, OwnedLinkedChunkId, lazy_loader::LazyLoaderError},
49 serde_helpers::extract_thread_root_from_content,
50 sync::RoomUpdates,
51 timer,
52};
53use matrix_sdk_common::executor::{JoinHandle, spawn};
54use ruma::{
55 OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, events::AnySyncEphemeralRoomEvent,
56 serde::Raw,
57};
58use tokio::{
59 select,
60 sync::{
61 Mutex, RwLock,
62 broadcast::{Receiver, Sender, channel, error::RecvError},
63 mpsc,
64 },
65};
66use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
67
68use crate::{
69 Client,
70 client::WeakClient,
71 event_cache::room::RoomEventCacheStateLock,
72 send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
73};
74
75mod deduplicator;
76mod pagination;
77#[cfg(feature = "e2e-encryption")]
78mod redecryptor;
79mod room;
80
81pub use pagination::{RoomPagination, RoomPaginationStatus};
82#[cfg(feature = "e2e-encryption")]
83pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
84pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
85
86#[derive(thiserror::Error, Debug)]
88pub enum EventCacheError {
89 #[error(
92 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
93 )]
94 NotSubscribedYet,
95
96 #[error("Room `{room_id}` is not found.")]
98 RoomNotFound {
99 room_id: OwnedRoomId,
101 },
102
103 #[error(transparent)]
105 BackpaginationError(Box<crate::Error>),
106
107 #[error("We were already back-paginating.")]
110 AlreadyBackpaginating,
111
112 #[error(transparent)]
114 Storage(#[from] EventCacheStoreError),
115
116 #[error(transparent)]
118 LockingStorage(#[from] CrossProcessLockError),
119
120 #[error("The owning client of the event cache has been dropped.")]
124 ClientDropped,
125
126 #[error(transparent)]
131 LinkedChunkLoader(#[from] LazyLoaderError),
132
133 #[error("the linked chunk metadata is invalid: {details}")]
136 InvalidLinkedChunkMetadata {
137 details: String,
139 },
140}
141
142pub type Result<T> = std::result::Result<T, EventCacheError>;
144
145pub struct EventCacheDropHandles {
147 listen_updates_task: JoinHandle<()>,
149
150 ignore_user_list_update_task: JoinHandle<()>,
152
153 auto_shrink_linked_chunk_task: JoinHandle<()>,
155
156 #[cfg(feature = "e2e-encryption")]
158 _redecryptor: redecryptor::Redecryptor,
159}
160
161impl fmt::Debug for EventCacheDropHandles {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
164 }
165}
166
167impl Drop for EventCacheDropHandles {
168 fn drop(&mut self) {
169 self.listen_updates_task.abort();
170 self.ignore_user_list_update_task.abort();
171 self.auto_shrink_linked_chunk_task.abort();
172 }
173}
174
175#[derive(Clone)]
181pub struct EventCache {
182 inner: Arc<EventCacheInner>,
184}
185
186impl fmt::Debug for EventCache {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 f.debug_struct("EventCache").finish_non_exhaustive()
189 }
190}
191
192impl EventCache {
193 pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
195 let (generic_update_sender, _) = channel(32);
196 let (linked_chunk_update_sender, _) = channel(32);
197
198 let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
199 let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
200 client.clone(),
201 linked_chunk_update_sender.clone(),
202 thread_subscriber_sender,
203 )));
204
205 #[cfg(feature = "experimental-search")]
206 let search_indexing_task = AbortOnDrop::new(spawn(Self::search_indexing_task(
207 client.clone(),
208 linked_chunk_update_sender.clone(),
209 )));
210
211 #[cfg(feature = "e2e-encryption")]
212 let redecryption_channels = redecryptor::RedecryptorChannels::new();
213
214 Self {
215 inner: Arc::new(EventCacheInner {
216 client,
217 store: event_cache_store,
218 multiple_room_updates_lock: Default::default(),
219 by_room: Default::default(),
220 drop_handles: Default::default(),
221 auto_shrink_sender: Default::default(),
222 generic_update_sender,
223 linked_chunk_update_sender,
224 _thread_subscriber_task: thread_subscriber_task,
225 #[cfg(feature = "experimental-search")]
226 _search_indexing_task: search_indexing_task,
227 #[cfg(feature = "e2e-encryption")]
228 redecryption_channels,
229 thread_subscriber_receiver,
230 }),
231 }
232 }
233
234 #[doc(hidden)]
238 pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
239 self.inner.thread_subscriber_receiver.resubscribe()
240 }
241
242 pub fn subscribe(&self) -> Result<()> {
248 let client = self.inner.client()?;
249
250 let _ = self.inner.drop_handles.get_or_init(|| {
252 let listen_updates_task = spawn(Self::listen_task(
254 self.inner.clone(),
255 client.subscribe_to_all_room_updates(),
256 ));
257
258 let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
259 self.inner.clone(),
260 client.subscribe_to_ignore_user_list_changes(),
261 ));
262
263 let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
264
265 self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
267
268 let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task(
269 Arc::downgrade(&self.inner),
270 auto_shrink_receiver,
271 ));
272
273 #[cfg(feature = "e2e-encryption")]
274 let redecryptor = {
275 let receiver = self
276 .inner
277 .redecryption_channels
278 .decryption_request_receiver
279 .lock()
280 .take()
281 .expect("We should have initialized the channel an subscribing should happen only once");
282
283 redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
284 };
285
286
287 Arc::new(EventCacheDropHandles {
288 listen_updates_task,
289 ignore_user_list_update_task,
290 auto_shrink_linked_chunk_task,
291 #[cfg(feature = "e2e-encryption")]
292 _redecryptor: redecryptor,
293 })
294 });
295
296 Ok(())
297 }
298
299 #[instrument(skip_all)]
300 async fn ignore_user_list_update_task(
301 inner: Arc<EventCacheInner>,
302 mut ignore_user_list_stream: Subscriber<Vec<String>>,
303 ) {
304 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
305 span.follows_from(Span::current());
306
307 async move {
308 while ignore_user_list_stream.next().await.is_some() {
309 info!("Received an ignore user list change");
310 if let Err(err) = inner.clear_all_rooms().await {
311 error!("when clearing room storage after ignore user list change: {err}");
312 }
313 }
314 info!("Ignore user list stream has closed");
315 }
316 .instrument(span)
317 .await;
318 }
319
320 #[doc(hidden)]
322 pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
323 self.inner.handle_room_updates(updates).await
324 }
325
326 #[instrument(skip_all)]
327 async fn listen_task(
328 inner: Arc<EventCacheInner>,
329 mut room_updates_feed: Receiver<RoomUpdates>,
330 ) {
331 trace!("Spawning the listen task");
332 loop {
333 match room_updates_feed.recv().await {
334 Ok(updates) => {
335 trace!("Receiving `RoomUpdates`");
336
337 if let Err(err) = inner.handle_room_updates(updates).await {
338 match err {
339 EventCacheError::ClientDropped => {
340 info!(
342 "Closing the event cache global listen task because client dropped"
343 );
344 break;
345 }
346 err => {
347 error!("Error when handling room updates: {err}");
348 }
349 }
350 }
351 }
352
353 Err(RecvError::Lagged(num_skipped)) => {
354 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
358 if let Err(err) = inner.clear_all_rooms().await {
359 error!("when clearing storage after lag in listen_task: {err}");
360 }
361 }
362
363 Err(RecvError::Closed) => {
364 info!("Closing the event cache global listen task because receiver closed");
366 break;
367 }
368 }
369 }
370 }
371
372 #[instrument(skip_all)]
386 async fn auto_shrink_linked_chunk_task(
387 inner: Weak<EventCacheInner>,
388 mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
389 ) {
390 while let Some(room_id) = rx.recv().await {
391 trace!(for_room = %room_id, "received notification to shrink");
392
393 let Some(inner) = inner.upgrade() else {
394 return;
395 };
396
397 let room = match inner.for_room(&room_id).await {
398 Ok(room) => room,
399 Err(err) => {
400 warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
401 continue;
402 }
403 };
404
405 trace!("waiting for state lock…");
406 let mut state = match room.inner.state.write().await {
407 Ok(state) => state,
408 Err(err) => {
409 warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}");
410 continue;
411 }
412 };
413
414 match state.auto_shrink_if_no_subscribers().await {
415 Ok(diffs) => {
416 if let Some(diffs) = diffs {
417 if !diffs.is_empty() {
424 let _ = room.inner.update_sender.send(
425 RoomEventCacheUpdate::UpdateTimelineEvents {
426 diffs,
427 origin: EventsOrigin::Cache,
428 },
429 );
430 }
431 } else {
432 debug!("auto-shrinking didn't happen");
433 }
434 }
435
436 Err(err) => {
437 warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
439 }
440 }
441 }
442
443 info!("Auto-shrink linked chunk task has been closed, exiting");
444 }
445
446 pub fn has_subscribed(&self) -> bool {
448 self.inner.drop_handles.get().is_some()
449 }
450
451 pub(crate) async fn for_room(
453 &self,
454 room_id: &RoomId,
455 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
456 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
457 return Err(EventCacheError::NotSubscribedYet);
458 };
459
460 let room = self.inner.for_room(room_id).await?;
461
462 Ok((room, drop_handles))
463 }
464
465 pub async fn clear_all_rooms(&self) -> Result<()> {
469 self.inner.clear_all_rooms().await
470 }
471
472 pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
482 self.inner.generic_update_sender.subscribe()
483 }
484
485 #[instrument(skip(client, thread_subscriber_sender))]
492 async fn handle_thread_subscriber_linked_chunk_update(
493 client: &WeakClient,
494 thread_subscriber_sender: &Sender<()>,
495 up: RoomEventCacheLinkedChunkUpdate,
496 ) -> bool {
497 let Some(client) = client.get() else {
498 debug!("Client is shutting down, exiting thread subscriber task");
500 return false;
501 };
502
503 let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
504 trace!("received an update for a non-thread linked chunk, ignoring");
505 return true;
506 };
507
508 let Some(room) = client.get_room(room_id) else {
509 warn!(%room_id, "unknown room");
510 return true;
511 };
512
513 let thread_root = thread_root.clone();
514
515 let mut new_events = up.events().peekable();
516
517 if new_events.peek().is_none() {
518 return true;
520 }
521
522 let with_thread_subscriptions = false;
531
532 let Some(push_context) = room
533 .push_context_internal(with_thread_subscriptions)
534 .await
535 .inspect_err(|err| {
536 warn!("Failed to get push context for threads: {err}");
537 })
538 .ok()
539 .flatten()
540 else {
541 warn!("Missing push context for thread subscriptions.");
542 return true;
543 };
544
545 let mut subscribe_up_to = None;
546
547 for ev in new_events.rev() {
551 if push_context
552 .for_event(ev.raw())
553 .await
554 .into_iter()
555 .any(|action| action.should_notify())
556 {
557 let Some(event_id) = ev.event_id() else {
558 continue;
560 };
561 subscribe_up_to = Some(event_id);
562 break;
563 }
564 }
565
566 if let Some(event_id) = subscribe_up_to {
569 trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
570 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
571 warn!(%err, "Failed to subscribe to thread");
572 } else {
573 let _ = thread_subscriber_sender.send(());
574 }
575 }
576
577 true
578 }
579
580 #[instrument(skip(client, thread_subscriber_sender))]
587 async fn handle_thread_subscriber_send_queue_update(
588 client: &WeakClient,
589 thread_subscriber_sender: &Sender<()>,
590 events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
591 up: SendQueueUpdate,
592 ) -> bool {
593 let Some(client) = client.get() else {
594 debug!("Client is shutting down, exiting thread subscriber task");
596 return false;
597 };
598
599 let room_id = up.room_id;
600 let Some(room) = client.get_room(&room_id) else {
601 warn!(%room_id, "unknown room");
602 return true;
603 };
604
605 let (thread_root, subscribe_up_to) = match up.update {
606 RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
607 match local_echo.content {
608 LocalEchoContent::Event { serialized_event, .. } => {
609 if let Some(thread_root) =
610 extract_thread_root_from_content(serialized_event.into_raw().0)
611 {
612 events_being_sent.insert(local_echo.transaction_id, thread_root);
613 }
614 }
615 LocalEchoContent::React { .. } => {
616 }
619 }
620 return true;
621 }
622
623 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
624 events_being_sent.remove(&transaction_id);
625 return true;
626 }
627
628 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
629 if let Some(thread_root) =
630 extract_thread_root_from_content(new_content.into_raw().0)
631 {
632 events_being_sent.insert(transaction_id, thread_root);
633 } else {
634 events_being_sent.remove(&transaction_id);
637 }
638 return true;
639 }
640
641 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
642 if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
643 (thread_root, event_id)
644 } else {
645 trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
647 return true;
648 }
649 }
650
651 RoomSendQueueUpdate::SendError { .. }
652 | RoomSendQueueUpdate::RetryEvent { .. }
653 | RoomSendQueueUpdate::MediaUpload { .. } => {
654 return true;
656 }
657 };
658
659 trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
661 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
662 {
663 warn!(%err, "Failed to subscribe to thread");
664 } else {
665 let _ = thread_subscriber_sender.send(());
666 }
667
668 true
669 }
670
671 #[instrument(skip_all)]
672 async fn thread_subscriber_task(
673 client: WeakClient,
674 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
675 thread_subscriber_sender: Sender<()>,
676 ) {
677 let mut send_q_rx = if let Some(client) = client.get() {
678 if !client.enabled_thread_subscriptions() {
679 trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
680 return;
681 }
682
683 client.send_queue().subscribe()
684 } else {
685 trace!("Client is shutting down, not spawning thread subscriber task");
686 return;
687 };
688
689 let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
690
691 let mut events_being_sent = HashMap::new();
696
697 loop {
698 select! {
699 res = send_q_rx.recv() => {
700 match res {
701 Ok(up) => {
702 if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
703 break;
704 }
705 }
706 Err(RecvError::Closed) => {
707 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
708 break;
709 }
710 Err(RecvError::Lagged(num_skipped)) => {
711 warn!(num_skipped, "Lagged behind linked chunk updates");
712 }
713 }
714 }
715
716 res = linked_chunk_rx.recv() => {
717 match res {
718 Ok(up) => {
719 if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
720 break;
721 }
722 }
723 Err(RecvError::Closed) => {
724 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
725 break;
726 }
727 Err(RecvError::Lagged(num_skipped)) => {
728 warn!(num_skipped, "Lagged behind linked chunk updates");
729 }
730 }
731 }
732 }
733 }
734 }
735
736 #[cfg(feature = "experimental-search")]
740 #[instrument(skip_all)]
741 async fn search_indexing_task(
742 client: WeakClient,
743 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
744 ) {
745 let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
746
747 loop {
748 match linked_chunk_update_receiver.recv().await {
749 Ok(room_ec_lc_update) => {
750 let OwnedLinkedChunkId::Room(room_id) =
751 room_ec_lc_update.linked_chunk_id.clone()
752 else {
753 trace!("Received non-room updates, ignoring.");
754 continue;
755 };
756
757 let mut timeline_events = room_ec_lc_update.events().peekable();
758
759 if timeline_events.peek().is_none() {
760 continue;
761 }
762
763 let Some(client) = client.get() else {
764 trace!("Client is shutting down, not spawning thread subscriber task");
765 return;
766 };
767
768 let maybe_room_cache = client.event_cache().for_room(&room_id).await;
769 let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
770 warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
771 continue;
772 };
773
774 let maybe_room = client.get_room(&room_id);
775 let Some(room) = maybe_room else {
776 warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
777 continue;
778 };
779 let redaction_rules =
780 room.clone_info().room_version_rules_or_default().redaction;
781
782 let mut search_index_guard = client.search_index().lock().await;
783
784 if let Err(err) = search_index_guard
785 .bulk_handle_timeline_event(
786 timeline_events,
787 &room_cache,
788 &room_id,
789 &redaction_rules,
790 )
791 .await
792 {
793 error!("Failed to handle events for indexing: {err}")
794 }
795 }
796 Err(RecvError::Closed) => {
797 debug!(
798 "Linked chunk update channel has been closed, exiting thread subscriber task"
799 );
800 break;
801 }
802 Err(RecvError::Lagged(num_skipped)) => {
803 warn!(num_skipped, "Lagged behind linked chunk updates");
804 }
805 }
806 }
807 }
808}
809
810struct EventCacheInner {
811 client: WeakClient,
814
815 store: EventCacheStoreLock,
817
818 multiple_room_updates_lock: Mutex<()>,
825
826 by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
828
829 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
831
832 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
839
840 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
845
846 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
854
855 _thread_subscriber_task: AbortOnDrop<()>,
862
863 #[cfg(feature = "experimental-search")]
870 _search_indexing_task: AbortOnDrop<()>,
871
872 thread_subscriber_receiver: Receiver<()>,
878
879 #[cfg(feature = "e2e-encryption")]
880 redecryption_channels: redecryptor::RedecryptorChannels,
881}
882
883type AutoShrinkChannelPayload = OwnedRoomId;
884
885impl EventCacheInner {
886 fn client(&self) -> Result<Client> {
887 self.client.get().ok_or(EventCacheError::ClientDropped)
888 }
889
890 async fn clear_all_rooms(&self) -> Result<()> {
892 let rooms = self.by_room.write().await;
932
933 let room_locks = join_all(
936 rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
937 )
938 .await;
939
940 let store_guard = match self.store.lock().await? {
942 EventCacheStoreLockState::Clean(store_guard) => store_guard,
943 EventCacheStoreLockState::Dirty(store_guard) => store_guard,
944 };
945 store_guard.clear_all_linked_chunks().await?;
946
947 try_join_all(room_locks.into_iter().map(|(room, state_guard)| async move {
951 let mut state_guard = state_guard?;
952 let updates_as_vector_diffs = state_guard.reset().await?;
953
954 let _ = room.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
955 diffs: updates_as_vector_diffs,
956 origin: EventsOrigin::Cache,
957 });
958
959 let _ = room
960 .inner
961 .generic_update_sender
962 .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
963
964 Ok::<_, EventCacheError>(())
965 }))
966 .await?;
967
968 Ok(())
969 }
970
971 #[instrument(skip(self, updates))]
973 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
974 let _lock = {
977 let _timer = timer!("Taking the `multiple_room_updates_lock`");
978 self.multiple_room_updates_lock.lock().await
979 };
980
981 for (room_id, left_room_update) in updates.left {
988 let room = self.for_room(&room_id).await?;
989
990 if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
991 error!("handling left room update: {err}");
993 }
994 }
995
996 for (room_id, joined_room_update) in updates.joined {
998 trace!(?room_id, "Handling a `JoinedRoomUpdate`");
999
1000 let room = self.for_room(&room_id).await?;
1001
1002 if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
1003 error!(%room_id, "handling joined room update: {err}");
1005 }
1006 }
1007
1008 Ok(())
1012 }
1013
1014 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
1016 let by_room_guard = self.by_room.read().await;
1019
1020 match by_room_guard.get(room_id) {
1021 Some(room) => Ok(room.clone()),
1022
1023 None => {
1024 drop(by_room_guard);
1026 let mut by_room_guard = self.by_room.write().await;
1027
1028 if let Some(room) = by_room_guard.get(room_id) {
1031 return Ok(room.clone());
1032 }
1033
1034 let pagination_status =
1035 SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
1036
1037 let Some(client) = self.client.get() else {
1038 return Err(EventCacheError::ClientDropped);
1039 };
1040
1041 let room = client
1042 .get_room(room_id)
1043 .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
1044 let room_version_rules = room.clone_info().room_version_rules_or_default();
1045
1046 let enabled_thread_support = matches!(
1047 client.base_client().threading_support,
1048 ThreadingSupport::Enabled { .. }
1049 );
1050
1051 let update_sender = Sender::new(32);
1052
1053 let room_state = RoomEventCacheStateLock::new(
1054 room_id.to_owned(),
1055 room_version_rules,
1056 enabled_thread_support,
1057 update_sender.clone(),
1058 self.generic_update_sender.clone(),
1059 self.linked_chunk_update_sender.clone(),
1060 self.store.clone(),
1061 pagination_status.clone(),
1062 )
1063 .await?;
1064
1065 let timeline_is_not_empty =
1066 room_state.read().await?.room_linked_chunk().revents().next().is_some();
1067
1068 let auto_shrink_sender =
1071 self.auto_shrink_sender.get().cloned().expect(
1072 "we must have called `EventCache::subscribe()` before calling here.",
1073 );
1074
1075 let room_event_cache = RoomEventCache::new(
1076 self.client.clone(),
1077 room_state,
1078 pagination_status,
1079 room_id.to_owned(),
1080 auto_shrink_sender,
1081 update_sender,
1082 self.generic_update_sender.clone(),
1083 );
1084
1085 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1086
1087 if timeline_is_not_empty {
1090 let _ = self
1091 .generic_update_sender
1092 .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1093 }
1094
1095 Ok(room_event_cache)
1096 }
1097 }
1098 }
1099}
1100
1101#[derive(Debug)]
1103pub struct BackPaginationOutcome {
1104 pub reached_start: bool,
1106
1107 pub events: Vec<TimelineEvent>,
1114}
1115
1116#[derive(Clone, Debug)]
1122pub struct RoomEventCacheGenericUpdate {
1123 pub room_id: OwnedRoomId,
1125}
1126
1127#[derive(Clone, Debug)]
1130struct RoomEventCacheLinkedChunkUpdate {
1131 linked_chunk_id: OwnedLinkedChunkId,
1133
1134 updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1137}
1138
1139impl RoomEventCacheLinkedChunkUpdate {
1140 pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1143 use itertools::Either;
1144 self.updates.into_iter().flat_map(|update| match update {
1145 linked_chunk::Update::PushItems { items, .. } => {
1146 Either::Left(Either::Left(items.into_iter()))
1147 }
1148 linked_chunk::Update::ReplaceItem { item, .. } => {
1149 Either::Left(Either::Right(std::iter::once(item)))
1150 }
1151 linked_chunk::Update::RemoveItem { .. }
1152 | linked_chunk::Update::DetachLastItems { .. }
1153 | linked_chunk::Update::StartReattachItems
1154 | linked_chunk::Update::EndReattachItems
1155 | linked_chunk::Update::NewItemsChunk { .. }
1156 | linked_chunk::Update::NewGapChunk { .. }
1157 | linked_chunk::Update::RemoveChunk(..)
1158 | linked_chunk::Update::Clear => {
1159 Either::Right(std::iter::empty())
1161 }
1162 })
1163 }
1164}
1165
1166#[derive(Debug, Clone)]
1168pub enum RoomEventCacheUpdate {
1169 MoveReadMarkerTo {
1171 event_id: OwnedEventId,
1173 },
1174
1175 UpdateMembers {
1177 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1182 },
1183
1184 UpdateTimelineEvents {
1186 diffs: Vec<VectorDiff<TimelineEvent>>,
1188
1189 origin: EventsOrigin,
1191 },
1192
1193 AddEphemeralEvents {
1195 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1198 },
1199}
1200
1201#[derive(Debug, Clone)]
1203pub enum EventsOrigin {
1204 Sync,
1206
1207 Pagination,
1209
1210 Cache,
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use std::{ops::Not, sync::Arc, time::Duration};
1217
1218 use assert_matches::assert_matches;
1219 use futures_util::FutureExt as _;
1220 use matrix_sdk_base::{
1221 RoomState,
1222 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1223 sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1224 };
1225 use matrix_sdk_test::{
1226 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
1227 };
1228 use ruma::{event_id, room_id, serde::Raw, user_id};
1229 use serde_json::json;
1230 use tokio::time::sleep;
1231
1232 use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1233 use crate::test_utils::{
1234 assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1235 };
1236
1237 #[async_test]
1238 async fn test_must_explicitly_subscribe() {
1239 let client = logged_in_client(None).await;
1240
1241 let event_cache = client.event_cache();
1242
1243 let room_id = room_id!("!omelette:fromage.fr");
1246 let result = event_cache.for_room(room_id).await;
1247
1248 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1251 }
1252
1253 #[async_test]
1254 async fn test_uniq_read_marker() {
1255 let client = logged_in_client(None).await;
1256 let room_id = room_id!("!galette:saucisse.bzh");
1257 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1258
1259 let event_cache = client.event_cache();
1260
1261 event_cache.subscribe().unwrap();
1262
1263 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1264 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1265 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1266
1267 assert!(events.is_empty());
1268
1269 let read_marker_event = Raw::from_json_string(
1271 json!({
1272 "content": {
1273 "event_id": "$crepe:saucisse.bzh"
1274 },
1275 "room_id": "!galette:saucisse.bzh",
1276 "type": "m.fully_read"
1277 })
1278 .to_string(),
1279 )
1280 .unwrap();
1281 let account_data = vec![read_marker_event; 100];
1282
1283 room_event_cache
1284 .inner
1285 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1286 .await
1287 .unwrap();
1288
1289 assert_matches!(
1291 stream.recv().await.unwrap(),
1292 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1293 );
1294
1295 assert!(stream.recv().now_or_never().is_none());
1296
1297 assert!(generic_stream.recv().now_or_never().is_none());
1299 }
1300
1301 #[async_test]
1302 async fn test_get_event_by_id() {
1303 let client = logged_in_client(None).await;
1304 let room_id1 = room_id!("!galette:saucisse.bzh");
1305 let room_id2 = room_id!("!crepe:saucisse.bzh");
1306
1307 client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1308 client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1309
1310 let event_cache = client.event_cache();
1311 event_cache.subscribe().unwrap();
1312
1313 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1315
1316 let eid1 = event_id!("$1");
1317 let eid2 = event_id!("$2");
1318 let eid3 = event_id!("$3");
1319
1320 let joined_room_update1 = JoinedRoomUpdate {
1321 timeline: Timeline {
1322 events: vec![
1323 f.text_msg("hey").event_id(eid1).into(),
1324 f.text_msg("you").event_id(eid2).into(),
1325 ],
1326 ..Default::default()
1327 },
1328 ..Default::default()
1329 };
1330
1331 let joined_room_update2 = JoinedRoomUpdate {
1332 timeline: Timeline {
1333 events: vec![f.text_msg("bjr").event_id(eid3).into()],
1334 ..Default::default()
1335 },
1336 ..Default::default()
1337 };
1338
1339 let mut updates = RoomUpdates::default();
1340 updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1341 updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1342
1343 event_cache.inner.handle_room_updates(updates).await.unwrap();
1345
1346 let room1 = client.get_room(room_id1).unwrap();
1348
1349 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1350
1351 let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
1352 assert_event_matches_msg(&found1, "hey");
1353
1354 let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
1355 assert_event_matches_msg(&found2, "you");
1356
1357 assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
1360 }
1361
1362 #[async_test]
1363 async fn test_save_event() {
1364 let client = logged_in_client(None).await;
1365 let room_id = room_id!("!galette:saucisse.bzh");
1366
1367 let event_cache = client.event_cache();
1368 event_cache.subscribe().unwrap();
1369
1370 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1371 let event_id = event_id!("$1");
1372
1373 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1374 let room = client.get_room(room_id).unwrap();
1375
1376 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1377 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1378
1379 assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
1381 }
1382
1383 #[async_test]
1384 async fn test_generic_update_when_loading_rooms() {
1385 let user = user_id!("@mnt_io:matrix.org");
1387 let client = logged_in_client(None).await;
1388 let room_id_0 = room_id!("!raclette:patate.ch");
1389 let room_id_1 = room_id!("!fondue:patate.ch");
1390
1391 let event_factory = EventFactory::new().room(room_id_0).sender(user);
1392
1393 let event_cache = client.event_cache();
1394 event_cache.subscribe().unwrap();
1395
1396 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1397 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1398
1399 client
1400 .event_cache_store()
1401 .lock()
1402 .await
1403 .expect("Could not acquire the event cache lock")
1404 .as_clean()
1405 .expect("Could not acquire a clean event cache lock")
1406 .handle_linked_chunk_updates(
1407 LinkedChunkId::Room(room_id_0),
1408 vec![
1409 Update::NewItemsChunk {
1411 previous: None,
1412 new: ChunkIdentifier::new(0),
1413 next: None,
1414 },
1415 Update::PushItems {
1416 at: Position::new(ChunkIdentifier::new(0), 0),
1417 items: vec![
1418 event_factory
1419 .text_msg("hello")
1420 .sender(user)
1421 .event_id(event_id!("$ev0"))
1422 .into_event(),
1423 ],
1424 },
1425 ],
1426 )
1427 .await
1428 .unwrap();
1429
1430 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1431
1432 {
1434 let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1435
1436 assert_matches!(
1437 generic_stream.recv().await,
1438 Ok(RoomEventCacheGenericUpdate { room_id }) => {
1439 assert_eq!(room_id, room_id_0);
1440 }
1441 );
1442 }
1443
1444 {
1446 let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1447
1448 assert!(generic_stream.recv().now_or_never().is_none());
1449 }
1450 }
1451
1452 #[async_test]
1453 async fn test_generic_update_when_paginating_room() {
1454 let user = user_id!("@mnt_io:matrix.org");
1456 let client = logged_in_client(None).await;
1457 let room_id = room_id!("!raclette:patate.ch");
1458
1459 let event_factory = EventFactory::new().room(room_id).sender(user);
1460
1461 let event_cache = client.event_cache();
1462 event_cache.subscribe().unwrap();
1463
1464 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1465
1466 client
1467 .event_cache_store()
1468 .lock()
1469 .await
1470 .expect("Could not acquire the event cache lock")
1471 .as_clean()
1472 .expect("Could not acquire a clean event cache lock")
1473 .handle_linked_chunk_updates(
1474 LinkedChunkId::Room(room_id),
1475 vec![
1476 Update::NewItemsChunk {
1478 previous: None,
1479 new: ChunkIdentifier::new(0),
1480 next: None,
1481 },
1482 Update::NewItemsChunk {
1484 previous: Some(ChunkIdentifier::new(0)),
1485 new: ChunkIdentifier::new(1),
1486 next: None,
1487 },
1488 Update::NewItemsChunk {
1490 previous: Some(ChunkIdentifier::new(1)),
1491 new: ChunkIdentifier::new(2),
1492 next: None,
1493 },
1494 Update::PushItems {
1495 at: Position::new(ChunkIdentifier::new(2), 0),
1496 items: vec![
1497 event_factory
1498 .text_msg("hello")
1499 .sender(user)
1500 .event_id(event_id!("$ev0"))
1501 .into_event(),
1502 ],
1503 },
1504 Update::NewItemsChunk {
1506 previous: Some(ChunkIdentifier::new(2)),
1507 new: ChunkIdentifier::new(3),
1508 next: None,
1509 },
1510 Update::PushItems {
1511 at: Position::new(ChunkIdentifier::new(3), 0),
1512 items: vec![
1513 event_factory
1514 .text_msg("world")
1515 .sender(user)
1516 .event_id(event_id!("$ev1"))
1517 .into_event(),
1518 ],
1519 },
1520 ],
1521 )
1522 .await
1523 .unwrap();
1524
1525 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1526
1527 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1529
1530 assert_matches!(
1531 generic_stream.recv().await,
1532 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1533 assert_eq!(room_id, expected_room_id);
1534 }
1535 );
1536
1537 let pagination = room_event_cache.pagination();
1538
1539 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1541
1542 assert_eq!(pagination_outcome.events.len(), 1);
1543 assert!(pagination_outcome.reached_start.not());
1544 assert_matches!(
1545 generic_stream.recv().await,
1546 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1547 assert_eq!(room_id, expected_room_id);
1548 }
1549 );
1550
1551 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1553
1554 assert!(pagination_outcome.events.is_empty());
1555 assert!(pagination_outcome.reached_start.not());
1556 assert!(generic_stream.recv().now_or_never().is_none());
1557
1558 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1560
1561 assert!(pagination_outcome.reached_start);
1562 assert!(generic_stream.recv().now_or_never().is_none());
1563 }
1564
1565 #[async_test]
1566 async fn test_for_room_when_room_is_not_found() {
1567 let client = logged_in_client(None).await;
1568 let room_id = room_id!("!raclette:patate.ch");
1569
1570 let event_cache = client.event_cache();
1571 event_cache.subscribe().unwrap();
1572
1573 assert_matches!(
1575 event_cache.for_room(room_id).await,
1576 Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1577 assert_eq!(room_id, not_found_room_id);
1578 }
1579 );
1580
1581 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1583
1584 assert!(event_cache.for_room(room_id).await.is_ok());
1586 }
1587
1588 #[cfg(not(target_family = "wasm"))]
1591 #[async_test]
1592 async fn test_no_refcycle_event_cache_tasks() {
1593 let client = MockClientBuilder::new(None).build().await;
1594
1595 sleep(Duration::from_secs(1)).await;
1597
1598 let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1599 assert_eq!(event_cache_weak.strong_count(), 1);
1600
1601 {
1602 let room_id = room_id!("!room:example.org");
1603
1604 let response = SyncResponseBuilder::default()
1606 .add_joined_room(JoinedRoomBuilder::new(room_id))
1607 .build_sync_response();
1608 client.inner.base_client.receive_sync_response(response).await.unwrap();
1609
1610 client.event_cache().subscribe().unwrap();
1611
1612 let (_room_event_cache, _drop_handles) =
1613 client.get_room(room_id).unwrap().event_cache().await.unwrap();
1614 }
1615
1616 drop(client);
1617
1618 sleep(Duration::from_secs(1)).await;
1620
1621 assert_eq!(
1623 event_cache_weak.strong_count(),
1624 0,
1625 "Too many strong references to the event cache {}",
1626 event_cache_weak.strong_count()
1627 );
1628 }
1629}