1#![forbid(missing_docs)]
29
30use std::{
31 collections::{BTreeMap, HashMap},
32 fmt,
33 sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40 ThreadingSupport,
41 cross_process_lock::CrossProcessLockError,
42 deserialized_responses::{AmbiguityChange, TimelineEvent},
43 event_cache::{
44 Gap,
45 store::{EventCacheStoreError, EventCacheStoreLock},
46 },
47 executor::AbortOnDrop,
48 linked_chunk::{self, OwnedLinkedChunkId, lazy_loader::LazyLoaderError},
49 serde_helpers::extract_thread_root_from_content,
50 sync::RoomUpdates,
51 timer,
52};
53use matrix_sdk_common::executor::{JoinHandle, spawn};
54use room::RoomEventCacheState;
55use ruma::{
56 OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, events::AnySyncEphemeralRoomEvent,
57 serde::Raw,
58};
59use tokio::{
60 select,
61 sync::{
62 Mutex, RwLock,
63 broadcast::{Receiver, Sender, channel, error::RecvError},
64 mpsc,
65 },
66};
67use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
68
69use crate::{
70 Client,
71 client::WeakClient,
72 send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
73};
74
75mod deduplicator;
76mod pagination;
77mod room;
78
79pub use pagination::{RoomPagination, RoomPaginationStatus};
80pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
81
82#[derive(thiserror::Error, Debug)]
84pub enum EventCacheError {
85 #[error(
88 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
89 )]
90 NotSubscribedYet,
91
92 #[error("Room `{room_id}` is not found.")]
94 RoomNotFound {
95 room_id: OwnedRoomId,
97 },
98
99 #[error(transparent)]
101 BackpaginationError(Box<crate::Error>),
102
103 #[error("We were already back-paginating.")]
106 AlreadyBackpaginating,
107
108 #[error(transparent)]
110 Storage(#[from] EventCacheStoreError),
111
112 #[error(transparent)]
114 LockingStorage(#[from] CrossProcessLockError),
115
116 #[error("The owning client of the event cache has been dropped.")]
120 ClientDropped,
121
122 #[error(transparent)]
127 LinkedChunkLoader(#[from] LazyLoaderError),
128
129 #[error("the linked chunk metadata is invalid: {details}")]
132 InvalidLinkedChunkMetadata {
133 details: String,
135 },
136}
137
138pub type Result<T> = std::result::Result<T, EventCacheError>;
140
141pub struct EventCacheDropHandles {
143 listen_updates_task: JoinHandle<()>,
145
146 ignore_user_list_update_task: JoinHandle<()>,
148
149 auto_shrink_linked_chunk_task: JoinHandle<()>,
151}
152
153impl fmt::Debug for EventCacheDropHandles {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
156 }
157}
158
159impl Drop for EventCacheDropHandles {
160 fn drop(&mut self) {
161 self.listen_updates_task.abort();
162 self.ignore_user_list_update_task.abort();
163 self.auto_shrink_linked_chunk_task.abort();
164 }
165}
166
167#[derive(Clone)]
173pub struct EventCache {
174 inner: Arc<EventCacheInner>,
176}
177
178impl fmt::Debug for EventCache {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 f.debug_struct("EventCache").finish_non_exhaustive()
181 }
182}
183
184impl EventCache {
185 pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
187 let (generic_update_sender, _) = channel(32);
188 let (linked_chunk_update_sender, _) = channel(32);
189
190 let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
191 let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
192 client.clone(),
193 linked_chunk_update_sender.clone(),
194 thread_subscriber_sender,
195 )));
196
197 #[cfg(feature = "experimental-search")]
198 let search_indexing_task = AbortOnDrop::new(spawn(Self::search_indexing_task(
199 client.clone(),
200 linked_chunk_update_sender.clone(),
201 )));
202
203 Self {
204 inner: Arc::new(EventCacheInner {
205 client,
206 store: event_cache_store,
207 multiple_room_updates_lock: Default::default(),
208 by_room: Default::default(),
209 drop_handles: Default::default(),
210 auto_shrink_sender: Default::default(),
211 generic_update_sender,
212 linked_chunk_update_sender,
213 _thread_subscriber_task: thread_subscriber_task,
214 #[cfg(feature = "experimental-search")]
215 _search_indexing_task: search_indexing_task,
216 thread_subscriber_receiver,
217 }),
218 }
219 }
220
221 #[doc(hidden)]
225 pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
226 self.inner.thread_subscriber_receiver.resubscribe()
227 }
228
229 pub fn subscribe(&self) -> Result<()> {
235 let client = self.inner.client()?;
236
237 let _ = self.inner.drop_handles.get_or_init(|| {
239 let listen_updates_task = spawn(Self::listen_task(
241 self.inner.clone(),
242 client.subscribe_to_all_room_updates(),
243 ));
244
245 let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
246 self.inner.clone(),
247 client.subscribe_to_ignore_user_list_changes(),
248 ));
249
250 let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
251
252 self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
254
255 let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task(
256 Arc::downgrade(&self.inner),
257 auto_shrink_receiver,
258 ));
259
260 Arc::new(EventCacheDropHandles {
261 listen_updates_task,
262 ignore_user_list_update_task,
263 auto_shrink_linked_chunk_task,
264 })
265 });
266
267 Ok(())
268 }
269
270 #[instrument(skip_all)]
271 async fn ignore_user_list_update_task(
272 inner: Arc<EventCacheInner>,
273 mut ignore_user_list_stream: Subscriber<Vec<String>>,
274 ) {
275 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
276 span.follows_from(Span::current());
277
278 async move {
279 while ignore_user_list_stream.next().await.is_some() {
280 info!("Received an ignore user list change");
281 if let Err(err) = inner.clear_all_rooms().await {
282 error!("when clearing room storage after ignore user list change: {err}");
283 }
284 }
285 info!("Ignore user list stream has closed");
286 }
287 .instrument(span)
288 .await;
289 }
290
291 #[doc(hidden)]
293 pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
294 self.inner.handle_room_updates(updates).await
295 }
296
297 #[instrument(skip_all)]
298 async fn listen_task(
299 inner: Arc<EventCacheInner>,
300 mut room_updates_feed: Receiver<RoomUpdates>,
301 ) {
302 trace!("Spawning the listen task");
303 loop {
304 match room_updates_feed.recv().await {
305 Ok(updates) => {
306 trace!("Receiving `RoomUpdates`");
307
308 if let Err(err) = inner.handle_room_updates(updates).await {
309 match err {
310 EventCacheError::ClientDropped => {
311 info!(
313 "Closing the event cache global listen task because client dropped"
314 );
315 break;
316 }
317 err => {
318 error!("Error when handling room updates: {err}");
319 }
320 }
321 }
322 }
323
324 Err(RecvError::Lagged(num_skipped)) => {
325 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
329 if let Err(err) = inner.clear_all_rooms().await {
330 error!("when clearing storage after lag in listen_task: {err}");
331 }
332 }
333
334 Err(RecvError::Closed) => {
335 info!("Closing the event cache global listen task because receiver closed");
337 break;
338 }
339 }
340 }
341 }
342
343 #[instrument(skip_all)]
357 async fn auto_shrink_linked_chunk_task(
358 inner: Weak<EventCacheInner>,
359 mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
360 ) {
361 while let Some(room_id) = rx.recv().await {
362 trace!(for_room = %room_id, "received notification to shrink");
363
364 let Some(inner) = inner.upgrade() else {
365 return;
366 };
367
368 let room = match inner.for_room(&room_id).await {
369 Ok(room) => room,
370 Err(err) => {
371 warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
372 continue;
373 }
374 };
375
376 trace!("waiting for state lock…");
377 let mut state = room.inner.state.write().await;
378
379 match state.auto_shrink_if_no_subscribers().await {
380 Ok(diffs) => {
381 if let Some(diffs) = diffs {
382 if !diffs.is_empty() {
389 let _ = room.inner.sender.send(
390 RoomEventCacheUpdate::UpdateTimelineEvents {
391 diffs,
392 origin: EventsOrigin::Cache,
393 },
394 );
395 }
396 } else {
397 debug!("auto-shrinking didn't happen");
398 }
399 }
400
401 Err(err) => {
402 warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
404 }
405 }
406 }
407
408 info!("Auto-shrink linked chunk task has been closed, exiting");
409 }
410
411 pub fn has_subscribed(&self) -> bool {
413 self.inner.drop_handles.get().is_some()
414 }
415
416 pub(crate) async fn for_room(
418 &self,
419 room_id: &RoomId,
420 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
421 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
422 return Err(EventCacheError::NotSubscribedYet);
423 };
424
425 let room = self.inner.for_room(room_id).await?;
426
427 Ok((room, drop_handles))
428 }
429
430 pub async fn clear_all_rooms(&self) -> Result<()> {
434 self.inner.clear_all_rooms().await
435 }
436
437 pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
447 self.inner.generic_update_sender.subscribe()
448 }
449
450 #[instrument(skip(client, thread_subscriber_sender))]
457 async fn handle_thread_subscriber_linked_chunk_update(
458 client: &WeakClient,
459 thread_subscriber_sender: &Sender<()>,
460 up: RoomEventCacheLinkedChunkUpdate,
461 ) -> bool {
462 let Some(client) = client.get() else {
463 debug!("Client is shutting down, exiting thread subscriber task");
465 return false;
466 };
467
468 let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
469 trace!("received an update for a non-thread linked chunk, ignoring");
470 return true;
471 };
472
473 let Some(room) = client.get_room(room_id) else {
474 warn!(%room_id, "unknown room");
475 return true;
476 };
477
478 let thread_root = thread_root.clone();
479
480 let mut new_events = up.events().peekable();
481
482 if new_events.peek().is_none() {
483 return true;
485 }
486
487 let with_thread_subscriptions = false;
496
497 let Some(push_context) = room
498 .push_context_internal(with_thread_subscriptions)
499 .await
500 .inspect_err(|err| {
501 warn!("Failed to get push context for threads: {err}");
502 })
503 .ok()
504 .flatten()
505 else {
506 warn!("Missing push context for thread subscriptions.");
507 return true;
508 };
509
510 let mut subscribe_up_to = None;
511
512 for ev in new_events.rev() {
516 if push_context
517 .for_event(ev.raw())
518 .await
519 .into_iter()
520 .any(|action| action.should_notify())
521 {
522 let Some(event_id) = ev.event_id() else {
523 continue;
525 };
526 subscribe_up_to = Some(event_id);
527 break;
528 }
529 }
530
531 if let Some(event_id) = subscribe_up_to {
534 trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
535 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
536 warn!(%err, "Failed to subscribe to thread");
537 } else {
538 let _ = thread_subscriber_sender.send(());
539 }
540 }
541
542 true
543 }
544
545 #[instrument(skip(client, thread_subscriber_sender))]
552 async fn handle_thread_subscriber_send_queue_update(
553 client: &WeakClient,
554 thread_subscriber_sender: &Sender<()>,
555 events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
556 up: SendQueueUpdate,
557 ) -> bool {
558 let Some(client) = client.get() else {
559 debug!("Client is shutting down, exiting thread subscriber task");
561 return false;
562 };
563
564 let room_id = up.room_id;
565 let Some(room) = client.get_room(&room_id) else {
566 warn!(%room_id, "unknown room");
567 return true;
568 };
569
570 let (thread_root, subscribe_up_to) = match up.update {
571 RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
572 match local_echo.content {
573 LocalEchoContent::Event { serialized_event, .. } => {
574 if let Some(thread_root) =
575 extract_thread_root_from_content(serialized_event.into_raw().0)
576 {
577 events_being_sent.insert(local_echo.transaction_id, thread_root);
578 }
579 }
580 LocalEchoContent::React { .. } => {
581 }
584 }
585 return true;
586 }
587
588 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
589 events_being_sent.remove(&transaction_id);
590 return true;
591 }
592
593 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
594 if let Some(thread_root) =
595 extract_thread_root_from_content(new_content.into_raw().0)
596 {
597 events_being_sent.insert(transaction_id, thread_root);
598 } else {
599 events_being_sent.remove(&transaction_id);
602 }
603 return true;
604 }
605
606 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
607 if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
608 (thread_root, event_id)
609 } else {
610 trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
612 return true;
613 }
614 }
615
616 RoomSendQueueUpdate::SendError { .. }
617 | RoomSendQueueUpdate::RetryEvent { .. }
618 | RoomSendQueueUpdate::MediaUpload { .. } => {
619 return true;
621 }
622 };
623
624 trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
626 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
627 {
628 warn!(%err, "Failed to subscribe to thread");
629 } else {
630 let _ = thread_subscriber_sender.send(());
631 }
632
633 true
634 }
635
636 #[instrument(skip_all)]
637 async fn thread_subscriber_task(
638 client: WeakClient,
639 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
640 thread_subscriber_sender: Sender<()>,
641 ) {
642 let mut send_q_rx = if let Some(client) = client.get() {
643 if !client.enabled_thread_subscriptions() {
644 trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
645 return;
646 }
647
648 client.send_queue().subscribe()
649 } else {
650 trace!("Client is shutting down, not spawning thread subscriber task");
651 return;
652 };
653
654 let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
655
656 let mut events_being_sent = HashMap::new();
661
662 loop {
663 select! {
664 res = send_q_rx.recv() => {
665 match res {
666 Ok(up) => {
667 if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
668 break;
669 }
670 }
671 Err(RecvError::Closed) => {
672 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
673 break;
674 }
675 Err(RecvError::Lagged(num_skipped)) => {
676 warn!(num_skipped, "Lagged behind linked chunk updates");
677 }
678 }
679 }
680
681 res = linked_chunk_rx.recv() => {
682 match res {
683 Ok(up) => {
684 if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
685 break;
686 }
687 }
688 Err(RecvError::Closed) => {
689 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
690 break;
691 }
692 Err(RecvError::Lagged(num_skipped)) => {
693 warn!(num_skipped, "Lagged behind linked chunk updates");
694 }
695 }
696 }
697 }
698 }
699 }
700
701 #[cfg(feature = "experimental-search")]
705 #[instrument(skip_all)]
706 async fn search_indexing_task(
707 client: WeakClient,
708 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
709 ) {
710 let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
711
712 loop {
713 match linked_chunk_update_receiver.recv().await {
714 Ok(room_ec_lc_update) => {
715 let OwnedLinkedChunkId::Room(room_id) =
716 room_ec_lc_update.linked_chunk_id.clone()
717 else {
718 trace!("Received non-room updates, ignoring.");
719 continue;
720 };
721
722 let mut timeline_events = room_ec_lc_update.events().peekable();
723
724 if timeline_events.peek().is_none() {
725 continue;
726 }
727
728 let Some(client) = client.get() else {
729 trace!("Client is shutting down, not spawning thread subscriber task");
730 return;
731 };
732
733 let maybe_room_cache = client.event_cache().for_room(&room_id).await;
734 let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
735 warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
736 continue;
737 };
738
739 let maybe_room = client.get_room(&room_id);
740 let Some(room) = maybe_room else {
741 warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
742 continue;
743 };
744 let redaction_rules =
745 room.clone_info().room_version_rules_or_default().redaction;
746
747 let mut search_index_guard = client.search_index().lock().await;
748
749 if let Err(err) = search_index_guard
750 .bulk_handle_timeline_event(
751 timeline_events,
752 &room_cache,
753 &room_id,
754 &redaction_rules,
755 )
756 .await
757 {
758 error!("Failed to handle events for indexing: {err}")
759 }
760 }
761 Err(RecvError::Closed) => {
762 debug!(
763 "Linked chunk update channel has been closed, exiting thread subscriber task"
764 );
765 break;
766 }
767 Err(RecvError::Lagged(num_skipped)) => {
768 warn!(num_skipped, "Lagged behind linked chunk updates");
769 }
770 }
771 }
772 }
773}
774
775struct EventCacheInner {
776 client: WeakClient,
779
780 store: EventCacheStoreLock,
782
783 multiple_room_updates_lock: Mutex<()>,
790
791 by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
793
794 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
796
797 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
804
805 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
810
811 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
819
820 _thread_subscriber_task: AbortOnDrop<()>,
827
828 #[cfg(feature = "experimental-search")]
835 _search_indexing_task: AbortOnDrop<()>,
836
837 thread_subscriber_receiver: Receiver<()>,
843}
844
845type AutoShrinkChannelPayload = OwnedRoomId;
846
847impl EventCacheInner {
848 fn client(&self) -> Result<Client> {
849 self.client.get().ok_or(EventCacheError::ClientDropped)
850 }
851
852 async fn clear_all_rooms(&self) -> Result<()> {
854 let rooms = self.by_room.write().await;
894
895 let room_locks = join_all(
898 rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
899 )
900 .await;
901
902 self.store.lock().await?.clear_all_linked_chunks().await?;
904
905 try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
909 let updates_as_vector_diffs = state_guard.reset().await?;
910
911 let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
912 diffs: updates_as_vector_diffs,
913 origin: EventsOrigin::Cache,
914 });
915
916 let _ = room
917 .inner
918 .generic_update_sender
919 .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
920
921 Ok::<_, EventCacheError>(())
922 }))
923 .await?;
924
925 Ok(())
926 }
927
928 #[instrument(skip(self, updates))]
930 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
931 let _lock = {
934 let _timer = timer!("Taking the `multiple_room_updates_lock`");
935 self.multiple_room_updates_lock.lock().await
936 };
937
938 for (room_id, left_room_update) in updates.left {
945 let room = self.for_room(&room_id).await?;
946
947 if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
948 error!("handling left room update: {err}");
950 }
951 }
952
953 for (room_id, joined_room_update) in updates.joined {
955 trace!(?room_id, "Handling a `JoinedRoomUpdate`");
956
957 let room = self.for_room(&room_id).await?;
958
959 if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
960 error!(%room_id, "handling joined room update: {err}");
962 }
963 }
964
965 Ok(())
969 }
970
971 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
973 let by_room_guard = self.by_room.read().await;
976
977 match by_room_guard.get(room_id) {
978 Some(room) => Ok(room.clone()),
979
980 None => {
981 drop(by_room_guard);
983 let mut by_room_guard = self.by_room.write().await;
984
985 if let Some(room) = by_room_guard.get(room_id) {
988 return Ok(room.clone());
989 }
990
991 let pagination_status =
992 SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
993
994 let Some(client) = self.client.get() else {
995 return Err(EventCacheError::ClientDropped);
996 };
997
998 let room = client
999 .get_room(room_id)
1000 .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
1001 let room_version_rules = room.clone_info().room_version_rules_or_default();
1002
1003 let enabled_thread_support = matches!(
1004 client.base_client().threading_support,
1005 ThreadingSupport::Enabled { .. }
1006 );
1007
1008 let room_state = RoomEventCacheState::new(
1009 room_id.to_owned(),
1010 room_version_rules,
1011 enabled_thread_support,
1012 self.linked_chunk_update_sender.clone(),
1013 self.store.clone(),
1014 pagination_status.clone(),
1015 )
1016 .await?;
1017
1018 let timeline_is_not_empty =
1019 room_state.room_linked_chunk().revents().next().is_some();
1020
1021 let auto_shrink_sender =
1024 self.auto_shrink_sender.get().cloned().expect(
1025 "we must have called `EventCache::subscribe()` before calling here.",
1026 );
1027
1028 let room_event_cache = RoomEventCache::new(
1029 self.client.clone(),
1030 room_state,
1031 pagination_status,
1032 room_id.to_owned(),
1033 auto_shrink_sender,
1034 self.generic_update_sender.clone(),
1035 );
1036
1037 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1038
1039 if timeline_is_not_empty {
1042 let _ = self
1043 .generic_update_sender
1044 .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1045 }
1046
1047 Ok(room_event_cache)
1048 }
1049 }
1050 }
1051}
1052
1053#[derive(Debug)]
1055pub struct BackPaginationOutcome {
1056 pub reached_start: bool,
1058
1059 pub events: Vec<TimelineEvent>,
1066}
1067
1068#[derive(Clone, Debug)]
1074pub struct RoomEventCacheGenericUpdate {
1075 pub room_id: OwnedRoomId,
1077}
1078
1079#[derive(Clone, Debug)]
1082struct RoomEventCacheLinkedChunkUpdate {
1083 linked_chunk_id: OwnedLinkedChunkId,
1085
1086 updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1089}
1090
1091impl RoomEventCacheLinkedChunkUpdate {
1092 pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1095 use itertools::Either;
1096 self.updates.into_iter().flat_map(|update| match update {
1097 linked_chunk::Update::PushItems { items, .. } => {
1098 Either::Left(Either::Left(items.into_iter()))
1099 }
1100 linked_chunk::Update::ReplaceItem { item, .. } => {
1101 Either::Left(Either::Right(std::iter::once(item)))
1102 }
1103 linked_chunk::Update::RemoveItem { .. }
1104 | linked_chunk::Update::DetachLastItems { .. }
1105 | linked_chunk::Update::StartReattachItems
1106 | linked_chunk::Update::EndReattachItems
1107 | linked_chunk::Update::NewItemsChunk { .. }
1108 | linked_chunk::Update::NewGapChunk { .. }
1109 | linked_chunk::Update::RemoveChunk(..)
1110 | linked_chunk::Update::Clear => {
1111 Either::Right(std::iter::empty())
1113 }
1114 })
1115 }
1116}
1117
1118#[derive(Debug, Clone)]
1120pub enum RoomEventCacheUpdate {
1121 MoveReadMarkerTo {
1123 event_id: OwnedEventId,
1125 },
1126
1127 UpdateMembers {
1129 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1134 },
1135
1136 UpdateTimelineEvents {
1138 diffs: Vec<VectorDiff<TimelineEvent>>,
1140
1141 origin: EventsOrigin,
1143 },
1144
1145 AddEphemeralEvents {
1147 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1150 },
1151}
1152
1153#[derive(Debug, Clone)]
1155pub enum EventsOrigin {
1156 Sync,
1158
1159 Pagination,
1161
1162 Cache,
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use std::{ops::Not, sync::Arc, time::Duration};
1169
1170 use assert_matches::assert_matches;
1171 use futures_util::FutureExt as _;
1172 use matrix_sdk_base::{
1173 RoomState,
1174 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1175 sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1176 };
1177 use matrix_sdk_test::{
1178 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
1179 };
1180 use ruma::{event_id, room_id, serde::Raw, user_id};
1181 use serde_json::json;
1182 use tokio::time::sleep;
1183
1184 use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1185 use crate::test_utils::{
1186 assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1187 };
1188
1189 #[async_test]
1190 async fn test_must_explicitly_subscribe() {
1191 let client = logged_in_client(None).await;
1192
1193 let event_cache = client.event_cache();
1194
1195 let room_id = room_id!("!omelette:fromage.fr");
1198 let result = event_cache.for_room(room_id).await;
1199
1200 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1203 }
1204
1205 #[async_test]
1206 async fn test_uniq_read_marker() {
1207 let client = logged_in_client(None).await;
1208 let room_id = room_id!("!galette:saucisse.bzh");
1209 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1210
1211 let event_cache = client.event_cache();
1212
1213 event_cache.subscribe().unwrap();
1214
1215 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1216 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1217 let (events, mut stream) = room_event_cache.subscribe().await;
1218
1219 assert!(events.is_empty());
1220
1221 let read_marker_event = Raw::from_json_string(
1223 json!({
1224 "content": {
1225 "event_id": "$crepe:saucisse.bzh"
1226 },
1227 "room_id": "!galette:saucisse.bzh",
1228 "type": "m.fully_read"
1229 })
1230 .to_string(),
1231 )
1232 .unwrap();
1233 let account_data = vec![read_marker_event; 100];
1234
1235 room_event_cache
1236 .inner
1237 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1238 .await
1239 .unwrap();
1240
1241 assert_matches!(
1243 stream.recv().await.unwrap(),
1244 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1245 );
1246
1247 assert!(stream.recv().now_or_never().is_none());
1248
1249 assert!(generic_stream.recv().now_or_never().is_none());
1251 }
1252
1253 #[async_test]
1254 async fn test_get_event_by_id() {
1255 let client = logged_in_client(None).await;
1256 let room_id1 = room_id!("!galette:saucisse.bzh");
1257 let room_id2 = room_id!("!crepe:saucisse.bzh");
1258
1259 client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1260 client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1261
1262 let event_cache = client.event_cache();
1263 event_cache.subscribe().unwrap();
1264
1265 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1267
1268 let eid1 = event_id!("$1");
1269 let eid2 = event_id!("$2");
1270 let eid3 = event_id!("$3");
1271
1272 let joined_room_update1 = JoinedRoomUpdate {
1273 timeline: Timeline {
1274 events: vec![
1275 f.text_msg("hey").event_id(eid1).into(),
1276 f.text_msg("you").event_id(eid2).into(),
1277 ],
1278 ..Default::default()
1279 },
1280 ..Default::default()
1281 };
1282
1283 let joined_room_update2 = JoinedRoomUpdate {
1284 timeline: Timeline {
1285 events: vec![f.text_msg("bjr").event_id(eid3).into()],
1286 ..Default::default()
1287 },
1288 ..Default::default()
1289 };
1290
1291 let mut updates = RoomUpdates::default();
1292 updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1293 updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1294
1295 event_cache.inner.handle_room_updates(updates).await.unwrap();
1297
1298 let room1 = client.get_room(room_id1).unwrap();
1300
1301 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1302
1303 let found1 = room_event_cache.find_event(eid1).await.unwrap();
1304 assert_event_matches_msg(&found1, "hey");
1305
1306 let found2 = room_event_cache.find_event(eid2).await.unwrap();
1307 assert_event_matches_msg(&found2, "you");
1308
1309 assert!(room_event_cache.find_event(eid3).await.is_none());
1312 }
1313
1314 #[async_test]
1315 async fn test_save_event() {
1316 let client = logged_in_client(None).await;
1317 let room_id = room_id!("!galette:saucisse.bzh");
1318
1319 let event_cache = client.event_cache();
1320 event_cache.subscribe().unwrap();
1321
1322 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1323 let event_id = event_id!("$1");
1324
1325 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1326 let room = client.get_room(room_id).unwrap();
1327
1328 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1329 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1330
1331 assert!(room_event_cache.find_event(event_id).await.is_some());
1333 }
1334
1335 #[async_test]
1336 async fn test_generic_update_when_loading_rooms() {
1337 let user = user_id!("@mnt_io:matrix.org");
1339 let client = logged_in_client(None).await;
1340 let room_id_0 = room_id!("!raclette:patate.ch");
1341 let room_id_1 = room_id!("!fondue:patate.ch");
1342
1343 let event_factory = EventFactory::new().room(room_id_0).sender(user);
1344
1345 let event_cache = client.event_cache();
1346 event_cache.subscribe().unwrap();
1347
1348 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1349 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1350
1351 client
1352 .event_cache_store()
1353 .lock()
1354 .await
1355 .unwrap()
1356 .handle_linked_chunk_updates(
1357 LinkedChunkId::Room(room_id_0),
1358 vec![
1359 Update::NewItemsChunk {
1361 previous: None,
1362 new: ChunkIdentifier::new(0),
1363 next: None,
1364 },
1365 Update::PushItems {
1366 at: Position::new(ChunkIdentifier::new(0), 0),
1367 items: vec![
1368 event_factory
1369 .text_msg("hello")
1370 .sender(user)
1371 .event_id(event_id!("$ev0"))
1372 .into_event(),
1373 ],
1374 },
1375 ],
1376 )
1377 .await
1378 .unwrap();
1379
1380 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1381
1382 {
1384 let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1385
1386 assert_matches!(
1387 generic_stream.recv().await,
1388 Ok(RoomEventCacheGenericUpdate { room_id }) => {
1389 assert_eq!(room_id, room_id_0);
1390 }
1391 );
1392 }
1393
1394 {
1396 let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1397
1398 assert!(generic_stream.recv().now_or_never().is_none());
1399 }
1400 }
1401
1402 #[async_test]
1403 async fn test_generic_update_when_paginating_room() {
1404 let user = user_id!("@mnt_io:matrix.org");
1406 let client = logged_in_client(None).await;
1407 let room_id = room_id!("!raclette:patate.ch");
1408
1409 let event_factory = EventFactory::new().room(room_id).sender(user);
1410
1411 let event_cache = client.event_cache();
1412 event_cache.subscribe().unwrap();
1413
1414 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1415
1416 client
1417 .event_cache_store()
1418 .lock()
1419 .await
1420 .unwrap()
1421 .handle_linked_chunk_updates(
1422 LinkedChunkId::Room(room_id),
1423 vec![
1424 Update::NewItemsChunk {
1426 previous: None,
1427 new: ChunkIdentifier::new(0),
1428 next: None,
1429 },
1430 Update::NewItemsChunk {
1432 previous: Some(ChunkIdentifier::new(0)),
1433 new: ChunkIdentifier::new(1),
1434 next: None,
1435 },
1436 Update::NewItemsChunk {
1438 previous: Some(ChunkIdentifier::new(1)),
1439 new: ChunkIdentifier::new(2),
1440 next: None,
1441 },
1442 Update::PushItems {
1443 at: Position::new(ChunkIdentifier::new(2), 0),
1444 items: vec![
1445 event_factory
1446 .text_msg("hello")
1447 .sender(user)
1448 .event_id(event_id!("$ev0"))
1449 .into_event(),
1450 ],
1451 },
1452 Update::NewItemsChunk {
1454 previous: Some(ChunkIdentifier::new(2)),
1455 new: ChunkIdentifier::new(3),
1456 next: None,
1457 },
1458 Update::PushItems {
1459 at: Position::new(ChunkIdentifier::new(3), 0),
1460 items: vec![
1461 event_factory
1462 .text_msg("world")
1463 .sender(user)
1464 .event_id(event_id!("$ev1"))
1465 .into_event(),
1466 ],
1467 },
1468 ],
1469 )
1470 .await
1471 .unwrap();
1472
1473 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1474
1475 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1477
1478 assert_matches!(
1479 generic_stream.recv().await,
1480 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1481 assert_eq!(room_id, expected_room_id);
1482 }
1483 );
1484
1485 let pagination = room_event_cache.pagination();
1486
1487 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1489
1490 assert_eq!(pagination_outcome.events.len(), 1);
1491 assert!(pagination_outcome.reached_start.not());
1492 assert_matches!(
1493 generic_stream.recv().await,
1494 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1495 assert_eq!(room_id, expected_room_id);
1496 }
1497 );
1498
1499 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1501
1502 assert!(pagination_outcome.events.is_empty());
1503 assert!(pagination_outcome.reached_start.not());
1504 assert!(generic_stream.recv().now_or_never().is_none());
1505
1506 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1508
1509 assert!(pagination_outcome.reached_start);
1510 assert!(generic_stream.recv().now_or_never().is_none());
1511 }
1512
1513 #[async_test]
1514 async fn test_for_room_when_room_is_not_found() {
1515 let client = logged_in_client(None).await;
1516 let room_id = room_id!("!raclette:patate.ch");
1517
1518 let event_cache = client.event_cache();
1519 event_cache.subscribe().unwrap();
1520
1521 assert_matches!(
1523 event_cache.for_room(room_id).await,
1524 Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1525 assert_eq!(room_id, not_found_room_id);
1526 }
1527 );
1528
1529 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1531
1532 assert!(event_cache.for_room(room_id).await.is_ok());
1534 }
1535
1536 #[cfg(not(target_family = "wasm"))]
1539 #[async_test]
1540 async fn test_no_refcycle_event_cache_tasks() {
1541 let client = MockClientBuilder::new(None).build().await;
1542
1543 sleep(Duration::from_secs(1)).await;
1545
1546 let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1547 assert_eq!(event_cache_weak.strong_count(), 1);
1548
1549 {
1550 let room_id = room_id!("!room:example.org");
1551
1552 let response = SyncResponseBuilder::default()
1554 .add_joined_room(JoinedRoomBuilder::new(room_id))
1555 .build_sync_response();
1556 client.inner.base_client.receive_sync_response(response).await.unwrap();
1557
1558 client.event_cache().subscribe().unwrap();
1559
1560 let (_room_event_cache, _drop_handles) =
1561 client.get_room(room_id).unwrap().event_cache().await.unwrap();
1562 }
1563
1564 drop(client);
1565
1566 sleep(Duration::from_secs(1)).await;
1568
1569 assert_eq!(
1571 event_cache_weak.strong_count(),
1572 0,
1573 "Too many strong references to the event cache {}",
1574 event_cache_weak.strong_count()
1575 );
1576 }
1577}