1#![forbid(missing_docs)]
29
30use std::{
31 collections::{BTreeMap, HashMap},
32 fmt,
33 sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40 ThreadingSupport,
41 cross_process_lock::CrossProcessLockError,
42 deserialized_responses::{AmbiguityChange, TimelineEvent},
43 event_cache::{
44 Gap,
45 store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
46 },
47 linked_chunk::{self, OwnedLinkedChunkId, lazy_loader::LazyLoaderError},
48 serde_helpers::extract_thread_root_from_content,
49 sync::RoomUpdates,
50 task_monitor::BackgroundTaskHandle,
51 timer,
52};
53use ruma::{
54 OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, events::AnySyncEphemeralRoomEvent,
55 serde::Raw,
56};
57use tokio::{
58 select,
59 sync::{
60 Mutex, RwLock,
61 broadcast::{Receiver, Sender, channel, error::RecvError},
62 mpsc,
63 },
64};
65use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
66
67use crate::{
68 Client,
69 client::{ClientInner, WeakClient},
70 event_cache::room::RoomEventCacheStateLock,
71 send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
72};
73
74mod deduplicator;
75mod pagination;
76#[cfg(feature = "e2e-encryption")]
77mod redecryptor;
78mod room;
79
80pub use pagination::{RoomPagination, RoomPaginationStatus};
81#[cfg(feature = "e2e-encryption")]
82pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
83pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
84
85#[derive(thiserror::Error, Debug)]
87pub enum EventCacheError {
88 #[error(
91 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
92 )]
93 NotSubscribedYet,
94
95 #[error("Room `{room_id}` is not found.")]
97 RoomNotFound {
98 room_id: OwnedRoomId,
100 },
101
102 #[error(transparent)]
104 BackpaginationError(Box<crate::Error>),
105
106 #[error("We were already back-paginating.")]
109 AlreadyBackpaginating,
110
111 #[error(transparent)]
113 Storage(#[from] EventCacheStoreError),
114
115 #[error(transparent)]
117 LockingStorage(#[from] CrossProcessLockError),
118
119 #[error("The owning client of the event cache has been dropped.")]
123 ClientDropped,
124
125 #[error(transparent)]
130 LinkedChunkLoader(#[from] LazyLoaderError),
131
132 #[error("the linked chunk metadata is invalid: {details}")]
135 InvalidLinkedChunkMetadata {
136 details: String,
138 },
139}
140
141pub type Result<T> = std::result::Result<T, EventCacheError>;
143
144pub struct EventCacheDropHandles {
146 listen_updates_task: BackgroundTaskHandle,
148
149 ignore_user_list_update_task: BackgroundTaskHandle,
151
152 auto_shrink_linked_chunk_task: BackgroundTaskHandle,
154
155 #[cfg(feature = "e2e-encryption")]
157 _redecryptor: redecryptor::Redecryptor,
158}
159
160impl fmt::Debug for EventCacheDropHandles {
161 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
163 }
164}
165
166impl Drop for EventCacheDropHandles {
167 fn drop(&mut self) {
168 self.listen_updates_task.abort();
169 self.ignore_user_list_update_task.abort();
170 self.auto_shrink_linked_chunk_task.abort();
171 }
172}
173
174#[derive(Clone)]
180pub struct EventCache {
181 inner: Arc<EventCacheInner>,
183}
184
185impl fmt::Debug for EventCache {
186 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187 f.debug_struct("EventCache").finish_non_exhaustive()
188 }
189}
190
191impl EventCache {
192 pub(crate) fn new(client: &Arc<ClientInner>, event_cache_store: EventCacheStoreLock) -> Self {
194 let (generic_update_sender, _) = channel(128);
195 let (linked_chunk_update_sender, _) = channel(128);
196
197 let weak_client = WeakClient::from_inner(client);
198
199 let (thread_subscriber_sender, thread_subscriber_receiver) = channel(128);
200 let thread_subscriber_task = client
201 .task_monitor
202 .spawn_background_task(
203 "event_cache::thread_subscriber",
204 Self::thread_subscriber_task(
205 weak_client.clone(),
206 linked_chunk_update_sender.clone(),
207 thread_subscriber_sender,
208 ),
209 )
210 .abort_on_drop();
211
212 #[cfg(feature = "experimental-search")]
213 let search_indexing_task = client
214 .task_monitor
215 .spawn_background_task(
216 "event_cache::search_indexing",
217 Self::search_indexing_task(weak_client.clone(), linked_chunk_update_sender.clone()),
218 )
219 .abort_on_drop();
220
221 #[cfg(feature = "e2e-encryption")]
222 let redecryption_channels = redecryptor::RedecryptorChannels::new();
223
224 Self {
225 inner: Arc::new(EventCacheInner {
226 client: weak_client,
227 store: event_cache_store,
228 multiple_room_updates_lock: Default::default(),
229 by_room: Default::default(),
230 drop_handles: Default::default(),
231 auto_shrink_sender: Default::default(),
232 generic_update_sender,
233 linked_chunk_update_sender,
234 _thread_subscriber_task: thread_subscriber_task,
235 #[cfg(feature = "experimental-search")]
236 _search_indexing_task: search_indexing_task,
237 #[cfg(feature = "e2e-encryption")]
238 redecryption_channels,
239 thread_subscriber_receiver,
240 }),
241 }
242 }
243
244 #[doc(hidden)]
248 pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
249 self.inner.thread_subscriber_receiver.resubscribe()
250 }
251
252 pub fn subscribe(&self) -> Result<()> {
258 let client = self.inner.client()?;
259
260 let _ = self.inner.drop_handles.get_or_init(|| {
262 let task_monitor = client.task_monitor();
263
264 let listen_updates_task = task_monitor.spawn_background_task("event_cache::listen_updates", Self::listen_task(
266 self.inner.clone(),
267 client.subscribe_to_all_room_updates(),
268 ));
269
270 let ignore_user_list_update_task = task_monitor.spawn_background_task("event_cache::ignore_user_list_update_task", Self::ignore_user_list_update_task(
271 self.inner.clone(),
272 client.subscribe_to_ignore_user_list_changes(),
273 ));
274
275 let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
276
277 self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
279
280 let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", Self::auto_shrink_linked_chunk_task(
281 Arc::downgrade(&self.inner),
282 auto_shrink_receiver,
283 ));
284
285 #[cfg(feature = "e2e-encryption")]
286 let redecryptor = {
287 let receiver = self
288 .inner
289 .redecryption_channels
290 .decryption_request_receiver
291 .lock()
292 .take()
293 .expect("We should have initialized the channel an subscribing should happen only once");
294
295 redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
296 };
297
298
299 Arc::new(EventCacheDropHandles {
300 listen_updates_task,
301 ignore_user_list_update_task,
302 auto_shrink_linked_chunk_task,
303 #[cfg(feature = "e2e-encryption")]
304 _redecryptor: redecryptor,
305 })
306 });
307
308 Ok(())
309 }
310
311 #[instrument(skip_all)]
312 async fn ignore_user_list_update_task(
313 inner: Arc<EventCacheInner>,
314 mut ignore_user_list_stream: Subscriber<Vec<String>>,
315 ) {
316 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
317 span.follows_from(Span::current());
318
319 async move {
320 while ignore_user_list_stream.next().await.is_some() {
321 info!("Received an ignore user list change");
322 if let Err(err) = inner.clear_all_rooms().await {
323 error!("when clearing room storage after ignore user list change: {err}");
324 }
325 }
326 info!("Ignore user list stream has closed");
327 }
328 .instrument(span)
329 .await;
330 }
331
332 #[doc(hidden)]
334 pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
335 self.inner.handle_room_updates(updates).await
336 }
337
338 #[instrument(skip_all)]
339 async fn listen_task(
340 inner: Arc<EventCacheInner>,
341 mut room_updates_feed: Receiver<RoomUpdates>,
342 ) {
343 trace!("Spawning the listen task");
344 loop {
345 match room_updates_feed.recv().await {
346 Ok(updates) => {
347 trace!("Receiving `RoomUpdates`");
348
349 if let Err(err) = inner.handle_room_updates(updates).await {
350 match err {
351 EventCacheError::ClientDropped => {
352 info!(
354 "Closing the event cache global listen task because client dropped"
355 );
356 break;
357 }
358 err => {
359 error!("Error when handling room updates: {err}");
360 }
361 }
362 }
363 }
364
365 Err(RecvError::Lagged(num_skipped)) => {
366 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
370 if let Err(err) = inner.clear_all_rooms().await {
371 error!("when clearing storage after lag in listen_task: {err}");
372 }
373 }
374
375 Err(RecvError::Closed) => {
376 info!("Closing the event cache global listen task because receiver closed");
378 break;
379 }
380 }
381 }
382 }
383
384 #[instrument(skip_all)]
398 async fn auto_shrink_linked_chunk_task(
399 inner: Weak<EventCacheInner>,
400 mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
401 ) {
402 while let Some(room_id) = rx.recv().await {
403 trace!(for_room = %room_id, "received notification to shrink");
404
405 let Some(inner) = inner.upgrade() else {
406 return;
407 };
408
409 let room = match inner.for_room(&room_id).await {
410 Ok(room) => room,
411 Err(err) => {
412 warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
413 continue;
414 }
415 };
416
417 trace!("waiting for state lock…");
418 let mut state = match room.inner.state.write().await {
419 Ok(state) => state,
420 Err(err) => {
421 warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}");
422 continue;
423 }
424 };
425
426 match state.auto_shrink_if_no_subscribers().await {
427 Ok(diffs) => {
428 if let Some(diffs) = diffs {
429 if !diffs.is_empty() {
436 let _ = room.inner.update_sender.send(
437 RoomEventCacheUpdate::UpdateTimelineEvents {
438 diffs,
439 origin: EventsOrigin::Cache,
440 },
441 );
442 }
443 } else {
444 debug!("auto-shrinking didn't happen");
445 }
446 }
447
448 Err(err) => {
449 warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
451 }
452 }
453 }
454
455 info!("Auto-shrink linked chunk task has been closed, exiting");
456 }
457
458 pub fn has_subscribed(&self) -> bool {
460 self.inner.drop_handles.get().is_some()
461 }
462
463 pub(crate) async fn for_room(
465 &self,
466 room_id: &RoomId,
467 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
468 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
469 return Err(EventCacheError::NotSubscribedYet);
470 };
471
472 let room = self.inner.for_room(room_id).await?;
473
474 Ok((room, drop_handles))
475 }
476
477 pub async fn clear_all_rooms(&self) -> Result<()> {
481 self.inner.clear_all_rooms().await
482 }
483
484 pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
494 self.inner.generic_update_sender.subscribe()
495 }
496
497 #[instrument(skip(client, thread_subscriber_sender))]
504 async fn handle_thread_subscriber_linked_chunk_update(
505 client: &WeakClient,
506 thread_subscriber_sender: &Sender<()>,
507 up: RoomEventCacheLinkedChunkUpdate,
508 ) -> bool {
509 let Some(client) = client.get() else {
510 debug!("Client is shutting down, exiting thread subscriber task");
512 return false;
513 };
514
515 let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
516 trace!("received an update for a non-thread linked chunk, ignoring");
517 return true;
518 };
519
520 let Some(room) = client.get_room(room_id) else {
521 warn!(%room_id, "unknown room");
522 return true;
523 };
524
525 let thread_root = thread_root.clone();
526
527 let mut new_events = up.events().peekable();
528
529 if new_events.peek().is_none() {
530 return true;
532 }
533
534 let with_thread_subscriptions = false;
543
544 let Some(push_context) = room
545 .push_context_internal(with_thread_subscriptions)
546 .await
547 .inspect_err(|err| {
548 warn!("Failed to get push context for threads: {err}");
549 })
550 .ok()
551 .flatten()
552 else {
553 warn!("Missing push context for thread subscriptions.");
554 return true;
555 };
556
557 let mut subscribe_up_to = None;
558
559 for ev in new_events.rev() {
563 if push_context
564 .for_event(ev.raw())
565 .await
566 .into_iter()
567 .any(|action| action.should_notify())
568 {
569 let Some(event_id) = ev.event_id() else {
570 continue;
572 };
573 subscribe_up_to = Some(event_id);
574 break;
575 }
576 }
577
578 if let Some(event_id) = subscribe_up_to {
581 trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
582 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
583 warn!(%err, "Failed to subscribe to thread");
584 } else {
585 let _ = thread_subscriber_sender.send(());
586 }
587 }
588
589 true
590 }
591
592 #[instrument(skip(client, thread_subscriber_sender))]
599 async fn handle_thread_subscriber_send_queue_update(
600 client: &WeakClient,
601 thread_subscriber_sender: &Sender<()>,
602 events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
603 up: SendQueueUpdate,
604 ) -> bool {
605 let Some(client) = client.get() else {
606 debug!("Client is shutting down, exiting thread subscriber task");
608 return false;
609 };
610
611 let room_id = up.room_id;
612 let Some(room) = client.get_room(&room_id) else {
613 warn!(%room_id, "unknown room");
614 return true;
615 };
616
617 let (thread_root, subscribe_up_to) = match up.update {
618 RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
619 match local_echo.content {
620 LocalEchoContent::Event { serialized_event, .. } => {
621 if let Some(thread_root) =
622 extract_thread_root_from_content(serialized_event.into_raw().0)
623 {
624 events_being_sent.insert(local_echo.transaction_id, thread_root);
625 }
626 }
627 LocalEchoContent::React { .. } => {
628 }
631 }
632 return true;
633 }
634
635 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
636 events_being_sent.remove(&transaction_id);
637 return true;
638 }
639
640 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
641 if let Some(thread_root) =
642 extract_thread_root_from_content(new_content.into_raw().0)
643 {
644 events_being_sent.insert(transaction_id, thread_root);
645 } else {
646 events_being_sent.remove(&transaction_id);
649 }
650 return true;
651 }
652
653 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
654 if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
655 (thread_root, event_id)
656 } else {
657 trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
659 return true;
660 }
661 }
662
663 RoomSendQueueUpdate::SendError { .. }
664 | RoomSendQueueUpdate::RetryEvent { .. }
665 | RoomSendQueueUpdate::MediaUpload { .. } => {
666 return true;
668 }
669 };
670
671 trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
673 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
674 {
675 warn!(%err, "Failed to subscribe to thread");
676 } else {
677 let _ = thread_subscriber_sender.send(());
678 }
679
680 true
681 }
682
683 #[instrument(skip_all)]
684 async fn thread_subscriber_task(
685 client: WeakClient,
686 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
687 thread_subscriber_sender: Sender<()>,
688 ) {
689 let mut send_q_rx = if let Some(client) = client.get() {
690 if !client.enabled_thread_subscriptions() {
691 trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
692 return;
693 }
694
695 client.send_queue().subscribe()
696 } else {
697 trace!("Client is shutting down, not spawning thread subscriber task");
698 return;
699 };
700
701 let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
702
703 let mut events_being_sent = HashMap::new();
708
709 loop {
710 select! {
711 res = send_q_rx.recv() => {
712 match res {
713 Ok(up) => {
714 if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
715 break;
716 }
717 }
718 Err(RecvError::Closed) => {
719 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
720 break;
721 }
722 Err(RecvError::Lagged(num_skipped)) => {
723 warn!(num_skipped, "Lagged behind linked chunk updates");
724 }
725 }
726 }
727
728 res = linked_chunk_rx.recv() => {
729 match res {
730 Ok(up) => {
731 if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
732 break;
733 }
734 }
735 Err(RecvError::Closed) => {
736 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
737 break;
738 }
739 Err(RecvError::Lagged(num_skipped)) => {
740 warn!(num_skipped, "Lagged behind linked chunk updates");
741 }
742 }
743 }
744 }
745 }
746 }
747
748 #[cfg(feature = "experimental-search")]
752 #[instrument(skip_all)]
753 async fn search_indexing_task(
754 client: WeakClient,
755 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
756 ) {
757 let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
758
759 loop {
760 match linked_chunk_update_receiver.recv().await {
761 Ok(room_ec_lc_update) => {
762 let OwnedLinkedChunkId::Room(room_id) =
763 room_ec_lc_update.linked_chunk_id.clone()
764 else {
765 trace!("Received non-room updates, ignoring.");
766 continue;
767 };
768
769 let mut timeline_events = room_ec_lc_update.events().peekable();
770
771 if timeline_events.peek().is_none() {
772 continue;
773 }
774
775 let Some(client) = client.get() else {
776 trace!("Client is shutting down, not spawning thread subscriber task");
777 return;
778 };
779
780 let maybe_room_cache = client.event_cache().for_room(&room_id).await;
781 let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
782 warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
783 continue;
784 };
785
786 let maybe_room = client.get_room(&room_id);
787 let Some(room) = maybe_room else {
788 warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
789 continue;
790 };
791 let redaction_rules =
792 room.clone_info().room_version_rules_or_default().redaction;
793
794 let mut search_index_guard = client.search_index().lock().await;
795
796 if let Err(err) = search_index_guard
797 .bulk_handle_timeline_event(
798 timeline_events,
799 &room_cache,
800 &room_id,
801 &redaction_rules,
802 )
803 .await
804 {
805 error!("Failed to handle events for indexing: {err}")
806 }
807 }
808 Err(RecvError::Closed) => {
809 debug!(
810 "Linked chunk update channel has been closed, exiting thread subscriber task"
811 );
812 break;
813 }
814 Err(RecvError::Lagged(num_skipped)) => {
815 warn!(num_skipped, "Lagged behind linked chunk updates");
816 }
817 }
818 }
819 }
820}
821
822struct EventCacheInner {
823 client: WeakClient,
826
827 store: EventCacheStoreLock,
829
830 multiple_room_updates_lock: Mutex<()>,
837
838 by_room: RwLock<HashMap<OwnedRoomId, RoomEventCache>>,
840
841 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
843
844 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
851
852 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
857
858 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
866
867 _thread_subscriber_task: BackgroundTaskHandle,
874
875 #[cfg(feature = "experimental-search")]
882 _search_indexing_task: BackgroundTaskHandle,
883
884 thread_subscriber_receiver: Receiver<()>,
890
891 #[cfg(feature = "e2e-encryption")]
892 redecryption_channels: redecryptor::RedecryptorChannels,
893}
894
895type AutoShrinkChannelPayload = OwnedRoomId;
896
897impl EventCacheInner {
898 fn client(&self) -> Result<Client> {
899 self.client.get().ok_or(EventCacheError::ClientDropped)
900 }
901
902 async fn clear_all_rooms(&self) -> Result<()> {
904 let rooms = self.by_room.write().await;
944
945 let room_locks = join_all(
948 rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
949 )
950 .await;
951
952 let store_guard = match self.store.lock().await? {
954 EventCacheStoreLockState::Clean(store_guard) => store_guard,
955 EventCacheStoreLockState::Dirty(store_guard) => store_guard,
956 };
957 store_guard.clear_all_linked_chunks().await?;
958
959 try_join_all(room_locks.into_iter().map(|(room, state_guard)| async move {
963 let mut state_guard = state_guard?;
964 let updates_as_vector_diffs = state_guard.reset().await?;
965
966 let _ = room.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
967 diffs: updates_as_vector_diffs,
968 origin: EventsOrigin::Cache,
969 });
970
971 let _ = room
972 .inner
973 .generic_update_sender
974 .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
975
976 Ok::<_, EventCacheError>(())
977 }))
978 .await?;
979
980 Ok(())
981 }
982
983 #[instrument(skip(self, updates))]
985 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
986 let _lock = {
989 let _timer = timer!("Taking the `multiple_room_updates_lock`");
990 self.multiple_room_updates_lock.lock().await
991 };
992
993 for (room_id, left_room_update) in updates.left {
1000 let Ok(room) = self.for_room(&room_id).await else {
1001 error!(?room_id, "Room must exist");
1002 continue;
1003 };
1004
1005 if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
1006 error!("handling left room update: {err}");
1008 }
1009 }
1010
1011 for (room_id, joined_room_update) in updates.joined {
1013 trace!(?room_id, "Handling a `JoinedRoomUpdate`");
1014
1015 let Ok(room) = self.for_room(&room_id).await else {
1016 error!(?room_id, "Room must exist");
1017 continue;
1018 };
1019
1020 if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
1021 error!(%room_id, "handling joined room update: {err}");
1023 }
1024 }
1025
1026 Ok(())
1030 }
1031
1032 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
1034 let by_room_guard = self.by_room.read().await;
1037
1038 match by_room_guard.get(room_id) {
1039 Some(room) => Ok(room.clone()),
1040
1041 None => {
1042 drop(by_room_guard);
1044 let mut by_room_guard = self.by_room.write().await;
1045
1046 if let Some(room) = by_room_guard.get(room_id) {
1049 return Ok(room.clone());
1050 }
1051
1052 let pagination_status =
1053 SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
1054
1055 let Some(client) = self.client.get() else {
1056 return Err(EventCacheError::ClientDropped);
1057 };
1058
1059 let room = client
1060 .get_room(room_id)
1061 .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
1062 let room_version_rules = room.clone_info().room_version_rules_or_default();
1063
1064 let enabled_thread_support = matches!(
1065 client.base_client().threading_support,
1066 ThreadingSupport::Enabled { .. }
1067 );
1068
1069 let update_sender = Sender::new(32);
1070
1071 let room_state = RoomEventCacheStateLock::new(
1072 room_id.to_owned(),
1073 room_version_rules,
1074 enabled_thread_support,
1075 update_sender.clone(),
1076 self.generic_update_sender.clone(),
1077 self.linked_chunk_update_sender.clone(),
1078 self.store.clone(),
1079 pagination_status.clone(),
1080 )
1081 .await?;
1082
1083 let timeline_is_not_empty =
1084 room_state.read().await?.room_linked_chunk().revents().next().is_some();
1085
1086 let auto_shrink_sender =
1089 self.auto_shrink_sender.get().cloned().expect(
1090 "we must have called `EventCache::subscribe()` before calling here.",
1091 );
1092
1093 let room_event_cache = RoomEventCache::new(
1094 self.client.clone(),
1095 room_state,
1096 pagination_status,
1097 room_id.to_owned(),
1098 auto_shrink_sender,
1099 update_sender,
1100 self.generic_update_sender.clone(),
1101 );
1102
1103 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1104
1105 if timeline_is_not_empty {
1108 let _ = self
1109 .generic_update_sender
1110 .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1111 }
1112
1113 Ok(room_event_cache)
1114 }
1115 }
1116 }
1117}
1118
1119#[derive(Debug)]
1121pub struct BackPaginationOutcome {
1122 pub reached_start: bool,
1124
1125 pub events: Vec<TimelineEvent>,
1132}
1133
1134#[derive(Clone, Debug)]
1140pub struct RoomEventCacheGenericUpdate {
1141 pub room_id: OwnedRoomId,
1143}
1144
1145#[derive(Clone, Debug)]
1148struct RoomEventCacheLinkedChunkUpdate {
1149 linked_chunk_id: OwnedLinkedChunkId,
1151
1152 updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1155}
1156
1157impl RoomEventCacheLinkedChunkUpdate {
1158 pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1161 use itertools::Either;
1162 self.updates.into_iter().flat_map(|update| match update {
1163 linked_chunk::Update::PushItems { items, .. } => {
1164 Either::Left(Either::Left(items.into_iter()))
1165 }
1166 linked_chunk::Update::ReplaceItem { item, .. } => {
1167 Either::Left(Either::Right(std::iter::once(item)))
1168 }
1169 linked_chunk::Update::RemoveItem { .. }
1170 | linked_chunk::Update::DetachLastItems { .. }
1171 | linked_chunk::Update::StartReattachItems
1172 | linked_chunk::Update::EndReattachItems
1173 | linked_chunk::Update::NewItemsChunk { .. }
1174 | linked_chunk::Update::NewGapChunk { .. }
1175 | linked_chunk::Update::RemoveChunk(..)
1176 | linked_chunk::Update::Clear => {
1177 Either::Right(std::iter::empty())
1179 }
1180 })
1181 }
1182}
1183
1184#[derive(Debug, Clone)]
1186pub enum RoomEventCacheUpdate {
1187 MoveReadMarkerTo {
1189 event_id: OwnedEventId,
1191 },
1192
1193 UpdateMembers {
1195 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1200 },
1201
1202 UpdateTimelineEvents {
1204 diffs: Vec<VectorDiff<TimelineEvent>>,
1206
1207 origin: EventsOrigin,
1209 },
1210
1211 AddEphemeralEvents {
1213 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1216 },
1217}
1218
1219#[derive(Debug, Clone)]
1221pub enum EventsOrigin {
1222 Sync,
1224
1225 Pagination,
1227
1228 Cache,
1230}
1231
1232#[cfg(test)]
1233mod tests {
1234 use std::{ops::Not, sync::Arc, time::Duration};
1235
1236 use assert_matches::assert_matches;
1237 use futures_util::FutureExt as _;
1238 use matrix_sdk_base::{
1239 RoomState,
1240 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1241 sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1242 };
1243 use matrix_sdk_test::{
1244 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
1245 };
1246 use ruma::{event_id, room_id, serde::Raw, user_id};
1247 use serde_json::json;
1248 use tokio::time::sleep;
1249
1250 use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1251 use crate::test_utils::{
1252 assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1253 };
1254
1255 #[async_test]
1256 async fn test_must_explicitly_subscribe() {
1257 let client = logged_in_client(None).await;
1258
1259 let event_cache = client.event_cache();
1260
1261 let room_id = room_id!("!omelette:fromage.fr");
1264 let result = event_cache.for_room(room_id).await;
1265
1266 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1269 }
1270
1271 #[async_test]
1272 async fn test_uniq_read_marker() {
1273 let client = logged_in_client(None).await;
1274 let room_id = room_id!("!galette:saucisse.bzh");
1275 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1276
1277 let event_cache = client.event_cache();
1278
1279 event_cache.subscribe().unwrap();
1280
1281 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1282 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1283 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1284
1285 assert!(events.is_empty());
1286
1287 let read_marker_event = Raw::from_json_string(
1289 json!({
1290 "content": {
1291 "event_id": "$crepe:saucisse.bzh"
1292 },
1293 "room_id": "!galette:saucisse.bzh",
1294 "type": "m.fully_read"
1295 })
1296 .to_string(),
1297 )
1298 .unwrap();
1299 let account_data = vec![read_marker_event; 100];
1300
1301 room_event_cache
1302 .inner
1303 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1304 .await
1305 .unwrap();
1306
1307 assert_matches!(
1309 stream.recv().await.unwrap(),
1310 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1311 );
1312
1313 assert!(stream.recv().now_or_never().is_none());
1314
1315 assert!(generic_stream.recv().now_or_never().is_none());
1317 }
1318
1319 #[async_test]
1320 async fn test_get_event_by_id() {
1321 let client = logged_in_client(None).await;
1322 let room_id1 = room_id!("!galette:saucisse.bzh");
1323 let room_id2 = room_id!("!crepe:saucisse.bzh");
1324
1325 client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1326 client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1327
1328 let event_cache = client.event_cache();
1329 event_cache.subscribe().unwrap();
1330
1331 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1333
1334 let eid1 = event_id!("$1");
1335 let eid2 = event_id!("$2");
1336 let eid3 = event_id!("$3");
1337
1338 let joined_room_update1 = JoinedRoomUpdate {
1339 timeline: Timeline {
1340 events: vec![
1341 f.text_msg("hey").event_id(eid1).into(),
1342 f.text_msg("you").event_id(eid2).into(),
1343 ],
1344 ..Default::default()
1345 },
1346 ..Default::default()
1347 };
1348
1349 let joined_room_update2 = JoinedRoomUpdate {
1350 timeline: Timeline {
1351 events: vec![f.text_msg("bjr").event_id(eid3).into()],
1352 ..Default::default()
1353 },
1354 ..Default::default()
1355 };
1356
1357 let mut updates = RoomUpdates::default();
1358 updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1359 updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1360
1361 event_cache.inner.handle_room_updates(updates).await.unwrap();
1363
1364 let room1 = client.get_room(room_id1).unwrap();
1366
1367 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1368
1369 let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
1370 assert_event_matches_msg(&found1, "hey");
1371
1372 let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
1373 assert_event_matches_msg(&found2, "you");
1374
1375 assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
1378 }
1379
1380 #[async_test]
1381 async fn test_save_event() {
1382 let client = logged_in_client(None).await;
1383 let room_id = room_id!("!galette:saucisse.bzh");
1384
1385 let event_cache = client.event_cache();
1386 event_cache.subscribe().unwrap();
1387
1388 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1389 let event_id = event_id!("$1");
1390
1391 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1392 let room = client.get_room(room_id).unwrap();
1393
1394 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1395 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1396
1397 assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
1399 }
1400
1401 #[async_test]
1402 async fn test_generic_update_when_loading_rooms() {
1403 let user = user_id!("@mnt_io:matrix.org");
1405 let client = logged_in_client(None).await;
1406 let room_id_0 = room_id!("!raclette:patate.ch");
1407 let room_id_1 = room_id!("!fondue:patate.ch");
1408
1409 let event_factory = EventFactory::new().room(room_id_0).sender(user);
1410
1411 let event_cache = client.event_cache();
1412 event_cache.subscribe().unwrap();
1413
1414 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1415 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1416
1417 client
1418 .event_cache_store()
1419 .lock()
1420 .await
1421 .expect("Could not acquire the event cache lock")
1422 .as_clean()
1423 .expect("Could not acquire a clean event cache lock")
1424 .handle_linked_chunk_updates(
1425 LinkedChunkId::Room(room_id_0),
1426 vec![
1427 Update::NewItemsChunk {
1429 previous: None,
1430 new: ChunkIdentifier::new(0),
1431 next: None,
1432 },
1433 Update::PushItems {
1434 at: Position::new(ChunkIdentifier::new(0), 0),
1435 items: vec![
1436 event_factory
1437 .text_msg("hello")
1438 .sender(user)
1439 .event_id(event_id!("$ev0"))
1440 .into_event(),
1441 ],
1442 },
1443 ],
1444 )
1445 .await
1446 .unwrap();
1447
1448 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1449
1450 {
1452 let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1453
1454 assert_matches!(
1455 generic_stream.recv().await,
1456 Ok(RoomEventCacheGenericUpdate { room_id }) => {
1457 assert_eq!(room_id, room_id_0);
1458 }
1459 );
1460 }
1461
1462 {
1464 let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1465
1466 assert!(generic_stream.recv().now_or_never().is_none());
1467 }
1468 }
1469
1470 #[async_test]
1471 async fn test_generic_update_when_paginating_room() {
1472 let user = user_id!("@mnt_io:matrix.org");
1474 let client = logged_in_client(None).await;
1475 let room_id = room_id!("!raclette:patate.ch");
1476
1477 let event_factory = EventFactory::new().room(room_id).sender(user);
1478
1479 let event_cache = client.event_cache();
1480 event_cache.subscribe().unwrap();
1481
1482 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1483
1484 client
1485 .event_cache_store()
1486 .lock()
1487 .await
1488 .expect("Could not acquire the event cache lock")
1489 .as_clean()
1490 .expect("Could not acquire a clean event cache lock")
1491 .handle_linked_chunk_updates(
1492 LinkedChunkId::Room(room_id),
1493 vec![
1494 Update::NewItemsChunk {
1496 previous: None,
1497 new: ChunkIdentifier::new(0),
1498 next: None,
1499 },
1500 Update::NewItemsChunk {
1502 previous: Some(ChunkIdentifier::new(0)),
1503 new: ChunkIdentifier::new(1),
1504 next: None,
1505 },
1506 Update::NewItemsChunk {
1508 previous: Some(ChunkIdentifier::new(1)),
1509 new: ChunkIdentifier::new(2),
1510 next: None,
1511 },
1512 Update::PushItems {
1513 at: Position::new(ChunkIdentifier::new(2), 0),
1514 items: vec![
1515 event_factory
1516 .text_msg("hello")
1517 .sender(user)
1518 .event_id(event_id!("$ev0"))
1519 .into_event(),
1520 ],
1521 },
1522 Update::NewItemsChunk {
1524 previous: Some(ChunkIdentifier::new(2)),
1525 new: ChunkIdentifier::new(3),
1526 next: None,
1527 },
1528 Update::PushItems {
1529 at: Position::new(ChunkIdentifier::new(3), 0),
1530 items: vec![
1531 event_factory
1532 .text_msg("world")
1533 .sender(user)
1534 .event_id(event_id!("$ev1"))
1535 .into_event(),
1536 ],
1537 },
1538 ],
1539 )
1540 .await
1541 .unwrap();
1542
1543 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1544
1545 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1547
1548 assert_matches!(
1549 generic_stream.recv().await,
1550 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1551 assert_eq!(room_id, expected_room_id);
1552 }
1553 );
1554
1555 let pagination = room_event_cache.pagination();
1556
1557 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1559
1560 assert_eq!(pagination_outcome.events.len(), 1);
1561 assert!(pagination_outcome.reached_start.not());
1562 assert_matches!(
1563 generic_stream.recv().await,
1564 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1565 assert_eq!(room_id, expected_room_id);
1566 }
1567 );
1568
1569 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1571
1572 assert!(pagination_outcome.events.is_empty());
1573 assert!(pagination_outcome.reached_start.not());
1574 assert!(generic_stream.recv().now_or_never().is_none());
1575
1576 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1578
1579 assert!(pagination_outcome.reached_start);
1580 assert!(generic_stream.recv().now_or_never().is_none());
1581 }
1582
1583 #[async_test]
1584 async fn test_for_room_when_room_is_not_found() {
1585 let client = logged_in_client(None).await;
1586 let room_id = room_id!("!raclette:patate.ch");
1587
1588 let event_cache = client.event_cache();
1589 event_cache.subscribe().unwrap();
1590
1591 assert_matches!(
1593 event_cache.for_room(room_id).await,
1594 Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1595 assert_eq!(room_id, not_found_room_id);
1596 }
1597 );
1598
1599 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1601
1602 assert!(event_cache.for_room(room_id).await.is_ok());
1604 }
1605
1606 #[cfg(not(target_family = "wasm"))]
1609 #[async_test]
1610 async fn test_no_refcycle_event_cache_tasks() {
1611 let client = MockClientBuilder::new(None).build().await;
1612
1613 sleep(Duration::from_secs(1)).await;
1615
1616 let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1617 assert_eq!(event_cache_weak.strong_count(), 1);
1618
1619 {
1620 let room_id = room_id!("!room:example.org");
1621
1622 let response = SyncResponseBuilder::default()
1624 .add_joined_room(JoinedRoomBuilder::new(room_id))
1625 .build_sync_response();
1626 client.inner.base_client.receive_sync_response(response).await.unwrap();
1627
1628 client.event_cache().subscribe().unwrap();
1629
1630 let (_room_event_cache, _drop_handles) =
1631 client.get_room(room_id).unwrap().event_cache().await.unwrap();
1632 }
1633
1634 drop(client);
1635
1636 sleep(Duration::from_secs(1)).await;
1638
1639 assert_eq!(
1641 event_cache_weak.strong_count(),
1642 0,
1643 "Too many strong references to the event cache {}",
1644 event_cache_weak.strong_count()
1645 );
1646 }
1647}