1#![forbid(missing_docs)]
29
30use std::{
31 collections::{BTreeMap, HashMap},
32 fmt,
33 sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40 deserialized_responses::{AmbiguityChange, TimelineEvent},
41 event_cache::{
42 store::{EventCacheStoreError, EventCacheStoreLock},
43 Gap,
44 },
45 executor::AbortOnDrop,
46 linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
47 serde_helpers::extract_thread_root_from_content,
48 store_locks::LockStoreError,
49 sync::RoomUpdates,
50 timer, ThreadingSupport,
51};
52use matrix_sdk_common::executor::{spawn, JoinHandle};
53use room::RoomEventCacheState;
54#[cfg(feature = "experimental-search")]
55use ruma::events::AnySyncMessageLikeEvent;
56use ruma::{
57 events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, OwnedTransactionId,
58 RoomId,
59};
60use tokio::{
61 select,
62 sync::{
63 broadcast::{channel, error::RecvError, Receiver, Sender},
64 mpsc, Mutex, RwLock,
65 },
66};
67use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
68
69use crate::{
70 client::WeakClient,
71 send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
72 Client,
73};
74
75mod deduplicator;
76mod pagination;
77mod room;
78
79pub use pagination::{RoomPagination, RoomPaginationStatus};
80pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
81
82#[derive(thiserror::Error, Debug)]
84pub enum EventCacheError {
85 #[error(
88 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
89 )]
90 NotSubscribedYet,
91
92 #[error("Room `{room_id}` is not found.")]
94 RoomNotFound {
95 room_id: OwnedRoomId,
97 },
98
99 #[error(transparent)]
101 BackpaginationError(Box<crate::Error>),
102
103 #[error("We were already back-paginating.")]
106 AlreadyBackpaginating,
107
108 #[error(transparent)]
110 Storage(#[from] EventCacheStoreError),
111
112 #[error(transparent)]
114 LockingStorage(#[from] LockStoreError),
115
116 #[error("The owning client of the event cache has been dropped.")]
120 ClientDropped,
121
122 #[error(transparent)]
127 LinkedChunkLoader(#[from] LazyLoaderError),
128
129 #[error("the linked chunk metadata is invalid: {details}")]
132 InvalidLinkedChunkMetadata {
133 details: String,
135 },
136}
137
138pub type Result<T> = std::result::Result<T, EventCacheError>;
140
141pub struct EventCacheDropHandles {
143 listen_updates_task: JoinHandle<()>,
145
146 ignore_user_list_update_task: JoinHandle<()>,
148
149 auto_shrink_linked_chunk_task: JoinHandle<()>,
151}
152
153impl fmt::Debug for EventCacheDropHandles {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
156 }
157}
158
159impl Drop for EventCacheDropHandles {
160 fn drop(&mut self) {
161 self.listen_updates_task.abort();
162 self.ignore_user_list_update_task.abort();
163 self.auto_shrink_linked_chunk_task.abort();
164 }
165}
166
167#[derive(Clone)]
173pub struct EventCache {
174 inner: Arc<EventCacheInner>,
176}
177
178impl fmt::Debug for EventCache {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 f.debug_struct("EventCache").finish_non_exhaustive()
181 }
182}
183
184impl EventCache {
185 pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
187 let (generic_update_sender, _) = channel(32);
188 let (linked_chunk_update_sender, _) = channel(32);
189
190 let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
191 let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
192 client.clone(),
193 linked_chunk_update_sender.clone(),
194 thread_subscriber_sender,
195 )));
196
197 #[cfg(feature = "experimental-search")]
198 let search_indexing_task = AbortOnDrop::new(spawn(Self::search_indexing_task(
199 client.clone(),
200 linked_chunk_update_sender.clone(),
201 )));
202
203 Self {
204 inner: Arc::new(EventCacheInner {
205 client,
206 store: event_cache_store,
207 multiple_room_updates_lock: Default::default(),
208 by_room: Default::default(),
209 drop_handles: Default::default(),
210 auto_shrink_sender: Default::default(),
211 generic_update_sender,
212 linked_chunk_update_sender,
213 _thread_subscriber_task: thread_subscriber_task,
214 #[cfg(feature = "experimental-search")]
215 _search_indexing_task: search_indexing_task,
216 thread_subscriber_receiver,
217 }),
218 }
219 }
220
221 #[doc(hidden)]
225 pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
226 self.inner.thread_subscriber_receiver.resubscribe()
227 }
228
229 pub fn subscribe(&self) -> Result<()> {
235 let client = self.inner.client()?;
236
237 let _ = self.inner.drop_handles.get_or_init(|| {
239 let listen_updates_task = spawn(Self::listen_task(
241 self.inner.clone(),
242 client.subscribe_to_all_room_updates(),
243 ));
244
245 let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
246 self.inner.clone(),
247 client.subscribe_to_ignore_user_list_changes(),
248 ));
249
250 let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
251
252 self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
254
255 let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task(
256 Arc::downgrade(&self.inner),
257 auto_shrink_receiver,
258 ));
259
260 Arc::new(EventCacheDropHandles {
261 listen_updates_task,
262 ignore_user_list_update_task,
263 auto_shrink_linked_chunk_task,
264 })
265 });
266
267 Ok(())
268 }
269
270 #[instrument(skip_all)]
271 async fn ignore_user_list_update_task(
272 inner: Arc<EventCacheInner>,
273 mut ignore_user_list_stream: Subscriber<Vec<String>>,
274 ) {
275 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
276 span.follows_from(Span::current());
277
278 async move {
279 while ignore_user_list_stream.next().await.is_some() {
280 info!("Received an ignore user list change");
281 if let Err(err) = inner.clear_all_rooms().await {
282 error!("when clearing room storage after ignore user list change: {err}");
283 }
284 }
285 info!("Ignore user list stream has closed");
286 }
287 .instrument(span)
288 .await;
289 }
290
291 #[doc(hidden)]
293 pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
294 self.inner.handle_room_updates(updates).await
295 }
296
297 #[instrument(skip_all)]
298 async fn listen_task(
299 inner: Arc<EventCacheInner>,
300 mut room_updates_feed: Receiver<RoomUpdates>,
301 ) {
302 trace!("Spawning the listen task");
303 loop {
304 match room_updates_feed.recv().await {
305 Ok(updates) => {
306 trace!("Receiving `RoomUpdates`");
307
308 if let Err(err) = inner.handle_room_updates(updates).await {
309 match err {
310 EventCacheError::ClientDropped => {
311 info!(
313 "Closing the event cache global listen task because client dropped"
314 );
315 break;
316 }
317 err => {
318 error!("Error when handling room updates: {err}");
319 }
320 }
321 }
322 }
323
324 Err(RecvError::Lagged(num_skipped)) => {
325 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
329 if let Err(err) = inner.clear_all_rooms().await {
330 error!("when clearing storage after lag in listen_task: {err}");
331 }
332 }
333
334 Err(RecvError::Closed) => {
335 info!("Closing the event cache global listen task because receiver closed");
337 break;
338 }
339 }
340 }
341 }
342
343 #[instrument(skip_all)]
357 async fn auto_shrink_linked_chunk_task(
358 inner: Weak<EventCacheInner>,
359 mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
360 ) {
361 while let Some(room_id) = rx.recv().await {
362 trace!(for_room = %room_id, "received notification to shrink");
363
364 let Some(inner) = inner.upgrade() else {
365 return;
366 };
367
368 let room = match inner.for_room(&room_id).await {
369 Ok(room) => room,
370 Err(err) => {
371 warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
372 continue;
373 }
374 };
375
376 trace!("waiting for state lock…");
377 let mut state = room.inner.state.write().await;
378
379 match state.auto_shrink_if_no_subscribers().await {
380 Ok(diffs) => {
381 if let Some(diffs) = diffs {
382 if !diffs.is_empty() {
389 let _ = room.inner.sender.send(
390 RoomEventCacheUpdate::UpdateTimelineEvents {
391 diffs,
392 origin: EventsOrigin::Cache,
393 },
394 );
395 }
396 } else {
397 debug!("auto-shrinking didn't happen");
398 }
399 }
400
401 Err(err) => {
402 warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
404 }
405 }
406 }
407
408 info!("Auto-shrink linked chunk task has been closed, exiting");
409 }
410
411 pub(crate) async fn for_room(
413 &self,
414 room_id: &RoomId,
415 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
416 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
417 return Err(EventCacheError::NotSubscribedYet);
418 };
419
420 let room = self.inner.for_room(room_id).await?;
421
422 Ok((room, drop_handles))
423 }
424
425 pub async fn clear_all_rooms(&self) -> Result<()> {
429 self.inner.clear_all_rooms().await
430 }
431
432 pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
442 self.inner.generic_update_sender.subscribe()
443 }
444
445 #[instrument(skip(client, thread_subscriber_sender))]
452 async fn handle_thread_subscriber_linked_chunk_update(
453 client: &WeakClient,
454 thread_subscriber_sender: &Sender<()>,
455 up: RoomEventCacheLinkedChunkUpdate,
456 ) -> bool {
457 let Some(client) = client.get() else {
458 debug!("Client is shutting down, exiting thread subscriber task");
460 return false;
461 };
462
463 let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
464 trace!("received an update for a non-thread linked chunk, ignoring");
465 return true;
466 };
467
468 let Some(room) = client.get_room(room_id) else {
469 warn!(%room_id, "unknown room");
470 return true;
471 };
472
473 let thread_root = thread_root.clone();
474
475 let mut new_events = up.events().peekable();
476
477 if new_events.peek().is_none() {
478 return true;
480 }
481
482 let with_thread_subscriptions = false;
491
492 let Some(push_context) = room
493 .push_context_internal(with_thread_subscriptions)
494 .await
495 .inspect_err(|err| {
496 warn!("Failed to get push context for threads: {err}");
497 })
498 .ok()
499 .flatten()
500 else {
501 warn!("Missing push context for thread subscriptions.");
502 return true;
503 };
504
505 let mut subscribe_up_to = None;
506
507 for ev in new_events.rev() {
511 if push_context
512 .for_event(ev.raw())
513 .await
514 .into_iter()
515 .any(|action| action.should_notify())
516 {
517 let Some(event_id) = ev.event_id() else {
518 continue;
520 };
521 subscribe_up_to = Some(event_id);
522 break;
523 }
524 }
525
526 if let Some(event_id) = subscribe_up_to {
529 trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
530 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
531 warn!(%err, "Failed to subscribe to thread");
532 } else {
533 let _ = thread_subscriber_sender.send(());
534 }
535 }
536
537 true
538 }
539
540 #[instrument(skip(client, thread_subscriber_sender))]
547 async fn handle_thread_subscriber_send_queue_update(
548 client: &WeakClient,
549 thread_subscriber_sender: &Sender<()>,
550 events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
551 up: SendQueueUpdate,
552 ) -> bool {
553 let Some(client) = client.get() else {
554 debug!("Client is shutting down, exiting thread subscriber task");
556 return false;
557 };
558
559 let room_id = up.room_id;
560 let Some(room) = client.get_room(&room_id) else {
561 warn!(%room_id, "unknown room");
562 return true;
563 };
564
565 let (thread_root, subscribe_up_to) = match up.update {
566 RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
567 match local_echo.content {
568 LocalEchoContent::Event { serialized_event, .. } => {
569 if let Some(thread_root) =
570 extract_thread_root_from_content(serialized_event.into_raw().0)
571 {
572 events_being_sent.insert(local_echo.transaction_id, thread_root);
573 }
574 }
575 LocalEchoContent::React { .. } => {
576 }
579 }
580 return true;
581 }
582
583 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
584 events_being_sent.remove(&transaction_id);
585 return true;
586 }
587
588 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
589 if let Some(thread_root) =
590 extract_thread_root_from_content(new_content.into_raw().0)
591 {
592 events_being_sent.insert(transaction_id, thread_root);
593 } else {
594 events_being_sent.remove(&transaction_id);
597 }
598 return true;
599 }
600
601 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
602 if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
603 (thread_root, event_id)
604 } else {
605 trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
607 return true;
608 }
609 }
610
611 RoomSendQueueUpdate::SendError { .. }
612 | RoomSendQueueUpdate::RetryEvent { .. }
613 | RoomSendQueueUpdate::MediaUpload { .. } => {
614 return true;
616 }
617 };
618
619 trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
621 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
622 {
623 warn!(%err, "Failed to subscribe to thread");
624 } else {
625 let _ = thread_subscriber_sender.send(());
626 }
627
628 true
629 }
630
631 #[instrument(skip_all)]
632 async fn thread_subscriber_task(
633 client: WeakClient,
634 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
635 thread_subscriber_sender: Sender<()>,
636 ) {
637 let mut send_q_rx = if let Some(client) = client.get() {
638 if !client.enabled_thread_subscriptions() {
639 trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
640 return;
641 }
642
643 client.send_queue().subscribe()
644 } else {
645 trace!("Client is shutting down, not spawning thread subscriber task");
646 return;
647 };
648
649 let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
650
651 let mut events_being_sent = HashMap::new();
656
657 loop {
658 select! {
659 res = send_q_rx.recv() => {
660 match res {
661 Ok(up) => {
662 if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
663 break;
664 }
665 }
666 Err(RecvError::Closed) => {
667 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
668 break;
669 }
670 Err(RecvError::Lagged(num_skipped)) => {
671 warn!(num_skipped, "Lagged behind linked chunk updates");
672 }
673 }
674 }
675
676 res = linked_chunk_rx.recv() => {
677 match res {
678 Ok(up) => {
679 if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
680 break;
681 }
682 }
683 Err(RecvError::Closed) => {
684 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
685 break;
686 }
687 Err(RecvError::Lagged(num_skipped)) => {
688 warn!(num_skipped, "Lagged behind linked chunk updates");
689 }
690 }
691 }
692 }
693 }
694 }
695
696 #[cfg(feature = "experimental-search")]
700 #[instrument(skip_all)]
701 async fn search_indexing_task(
702 client: WeakClient,
703 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
704 ) {
705 let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
706
707 loop {
708 match linked_chunk_update_receiver.recv().await {
709 Ok(room_ec_lc_update) => {
710 let OwnedLinkedChunkId::Room(room_id) =
711 room_ec_lc_update.linked_chunk_id.clone()
712 else {
713 trace!("Received non-room updates, ignoring.");
714 continue;
715 };
716
717 let mut timeline_events = room_ec_lc_update.events().peekable();
718
719 if timeline_events.peek().is_none() {
720 continue;
721 }
722
723 let Some(client) = client.get() else {
724 trace!("Client is shutting down, not spawning thread subscriber task");
725 return;
726 };
727
728 let mut search_index_guard = client.search_index().lock().await;
729
730 for event in timeline_events {
731 if let Some(message_event) = parse_timeline_event_for_search_index(&event) {
732 if let Err(err) =
733 search_index_guard.handle_event(message_event, &room_id)
734 {
735 warn!("Failed to handle event for indexing: {err}")
736 }
737 }
738 }
739 }
740 Err(RecvError::Closed) => {
741 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
742 break;
743 }
744 Err(RecvError::Lagged(num_skipped)) => {
745 warn!(num_skipped, "Lagged behind linked chunk updates");
746 }
747 }
748 }
749 }
750}
751
752#[cfg(feature = "experimental-search")]
753fn parse_timeline_event_for_search_index(event: &TimelineEvent) -> Option<AnySyncMessageLikeEvent> {
754 use ruma::events::AnySyncTimelineEvent;
755
756 if event.kind.is_utd() {
757 return None;
758 }
759
760 match event.raw().deserialize() {
761 Ok(event) => match event {
762 AnySyncTimelineEvent::MessageLike(event) => Some(event),
763 AnySyncTimelineEvent::State(_) => None,
764 },
765
766 Err(e) => {
767 warn!("failed to parse event: {e:?}");
768 None
769 }
770 }
771}
772
773struct EventCacheInner {
774 client: WeakClient,
777
778 store: EventCacheStoreLock,
780
781 multiple_room_updates_lock: Mutex<()>,
788
789 by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
791
792 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
794
795 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
802
803 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
808
809 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
817
818 _thread_subscriber_task: AbortOnDrop<()>,
825
826 #[cfg(feature = "experimental-search")]
833 _search_indexing_task: AbortOnDrop<()>,
834
835 thread_subscriber_receiver: Receiver<()>,
841}
842
843type AutoShrinkChannelPayload = OwnedRoomId;
844
845impl EventCacheInner {
846 fn client(&self) -> Result<Client> {
847 self.client.get().ok_or(EventCacheError::ClientDropped)
848 }
849
850 async fn clear_all_rooms(&self) -> Result<()> {
852 let rooms = self.by_room.write().await;
892
893 let room_locks = join_all(
896 rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
897 )
898 .await;
899
900 self.store.lock().await?.clear_all_linked_chunks().await?;
902
903 try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
907 let updates_as_vector_diffs = state_guard.reset().await?;
908
909 let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
910 diffs: updates_as_vector_diffs,
911 origin: EventsOrigin::Cache,
912 });
913
914 let _ = room
915 .inner
916 .generic_update_sender
917 .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
918
919 Ok::<_, EventCacheError>(())
920 }))
921 .await?;
922
923 Ok(())
924 }
925
926 #[instrument(skip(self, updates))]
928 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
929 let _lock = {
932 let _timer = timer!("Taking the `multiple_room_updates_lock`");
933 self.multiple_room_updates_lock.lock().await
934 };
935
936 for (room_id, left_room_update) in updates.left {
943 let room = self.for_room(&room_id).await?;
944
945 if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
946 error!("handling left room update: {err}");
948 }
949 }
950
951 for (room_id, joined_room_update) in updates.joined {
953 trace!(?room_id, "Handling a `JoinedRoomUpdate`");
954
955 let room = self.for_room(&room_id).await?;
956
957 if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
958 error!(%room_id, "handling joined room update: {err}");
960 }
961 }
962
963 Ok(())
967 }
968
969 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
971 let by_room_guard = self.by_room.read().await;
974
975 match by_room_guard.get(room_id) {
976 Some(room) => Ok(room.clone()),
977
978 None => {
979 drop(by_room_guard);
981 let mut by_room_guard = self.by_room.write().await;
982
983 if let Some(room) = by_room_guard.get(room_id) {
986 return Ok(room.clone());
987 }
988
989 let pagination_status =
990 SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
991
992 let Some(client) = self.client.get() else {
993 return Err(EventCacheError::ClientDropped);
994 };
995
996 let room = client
997 .get_room(room_id)
998 .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
999 let room_version_rules = room.clone_info().room_version_rules_or_default();
1000
1001 let enabled_thread_support = matches!(
1002 client.base_client().threading_support,
1003 ThreadingSupport::Enabled { .. }
1004 );
1005
1006 let room_state = RoomEventCacheState::new(
1007 room_id.to_owned(),
1008 room_version_rules,
1009 enabled_thread_support,
1010 self.linked_chunk_update_sender.clone(),
1011 self.store.clone(),
1012 pagination_status.clone(),
1013 )
1014 .await?;
1015
1016 let timeline_is_not_empty =
1017 room_state.room_linked_chunk().revents().next().is_some();
1018
1019 let auto_shrink_sender =
1022 self.auto_shrink_sender.get().cloned().expect(
1023 "we must have called `EventCache::subscribe()` before calling here.",
1024 );
1025
1026 let room_event_cache = RoomEventCache::new(
1027 self.client.clone(),
1028 room_state,
1029 pagination_status,
1030 room_id.to_owned(),
1031 auto_shrink_sender,
1032 self.generic_update_sender.clone(),
1033 );
1034
1035 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1036
1037 if timeline_is_not_empty {
1040 let _ = self
1041 .generic_update_sender
1042 .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1043 }
1044
1045 Ok(room_event_cache)
1046 }
1047 }
1048 }
1049}
1050
1051#[derive(Debug)]
1053pub struct BackPaginationOutcome {
1054 pub reached_start: bool,
1056
1057 pub events: Vec<TimelineEvent>,
1064}
1065
1066#[derive(Clone, Debug)]
1072pub struct RoomEventCacheGenericUpdate {
1073 pub room_id: OwnedRoomId,
1075}
1076
1077#[derive(Clone, Debug)]
1080struct RoomEventCacheLinkedChunkUpdate {
1081 linked_chunk_id: OwnedLinkedChunkId,
1083
1084 updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1087}
1088
1089impl RoomEventCacheLinkedChunkUpdate {
1090 pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1093 use itertools::Either;
1094 self.updates.into_iter().flat_map(|update| match update {
1095 linked_chunk::Update::PushItems { items, .. } => {
1096 Either::Left(Either::Left(items.into_iter()))
1097 }
1098 linked_chunk::Update::ReplaceItem { item, .. } => {
1099 Either::Left(Either::Right(std::iter::once(item)))
1100 }
1101 linked_chunk::Update::RemoveItem { .. }
1102 | linked_chunk::Update::DetachLastItems { .. }
1103 | linked_chunk::Update::StartReattachItems
1104 | linked_chunk::Update::EndReattachItems
1105 | linked_chunk::Update::NewItemsChunk { .. }
1106 | linked_chunk::Update::NewGapChunk { .. }
1107 | linked_chunk::Update::RemoveChunk(..)
1108 | linked_chunk::Update::Clear => {
1109 Either::Right(std::iter::empty())
1111 }
1112 })
1113 }
1114}
1115
1116#[derive(Debug, Clone)]
1118pub enum RoomEventCacheUpdate {
1119 MoveReadMarkerTo {
1121 event_id: OwnedEventId,
1123 },
1124
1125 UpdateMembers {
1127 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1132 },
1133
1134 UpdateTimelineEvents {
1136 diffs: Vec<VectorDiff<TimelineEvent>>,
1138
1139 origin: EventsOrigin,
1141 },
1142
1143 AddEphemeralEvents {
1145 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1148 },
1149}
1150
1151#[derive(Debug, Clone)]
1153pub enum EventsOrigin {
1154 Sync,
1156
1157 Pagination,
1159
1160 Cache,
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166 use std::{ops::Not, sync::Arc, time::Duration};
1167
1168 use assert_matches::assert_matches;
1169 use futures_util::FutureExt as _;
1170 use matrix_sdk_base::{
1171 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1172 sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1173 RoomState,
1174 };
1175 use matrix_sdk_test::{
1176 async_test, event_factory::EventFactory, JoinedRoomBuilder, SyncResponseBuilder,
1177 };
1178 use ruma::{event_id, room_id, serde::Raw, user_id};
1179 use serde_json::json;
1180 use tokio::time::sleep;
1181
1182 use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1183 use crate::test_utils::{
1184 assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1185 };
1186
1187 #[async_test]
1188 async fn test_must_explicitly_subscribe() {
1189 let client = logged_in_client(None).await;
1190
1191 let event_cache = client.event_cache();
1192
1193 let room_id = room_id!("!omelette:fromage.fr");
1196 let result = event_cache.for_room(room_id).await;
1197
1198 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1201 }
1202
1203 #[async_test]
1204 async fn test_uniq_read_marker() {
1205 let client = logged_in_client(None).await;
1206 let room_id = room_id!("!galette:saucisse.bzh");
1207 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1208
1209 let event_cache = client.event_cache();
1210
1211 event_cache.subscribe().unwrap();
1212
1213 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1214 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1215 let (events, mut stream) = room_event_cache.subscribe().await;
1216
1217 assert!(events.is_empty());
1218
1219 let read_marker_event = Raw::from_json_string(
1221 json!({
1222 "content": {
1223 "event_id": "$crepe:saucisse.bzh"
1224 },
1225 "room_id": "!galette:saucisse.bzh",
1226 "type": "m.fully_read"
1227 })
1228 .to_string(),
1229 )
1230 .unwrap();
1231 let account_data = vec![read_marker_event; 100];
1232
1233 room_event_cache
1234 .inner
1235 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1236 .await
1237 .unwrap();
1238
1239 assert_matches!(
1241 stream.recv().await.unwrap(),
1242 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1243 );
1244
1245 assert!(stream.recv().now_or_never().is_none());
1246
1247 assert!(generic_stream.recv().now_or_never().is_none());
1249 }
1250
1251 #[async_test]
1252 async fn test_get_event_by_id() {
1253 let client = logged_in_client(None).await;
1254 let room_id1 = room_id!("!galette:saucisse.bzh");
1255 let room_id2 = room_id!("!crepe:saucisse.bzh");
1256
1257 client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1258 client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1259
1260 let event_cache = client.event_cache();
1261 event_cache.subscribe().unwrap();
1262
1263 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1265
1266 let eid1 = event_id!("$1");
1267 let eid2 = event_id!("$2");
1268 let eid3 = event_id!("$3");
1269
1270 let joined_room_update1 = JoinedRoomUpdate {
1271 timeline: Timeline {
1272 events: vec![
1273 f.text_msg("hey").event_id(eid1).into(),
1274 f.text_msg("you").event_id(eid2).into(),
1275 ],
1276 ..Default::default()
1277 },
1278 ..Default::default()
1279 };
1280
1281 let joined_room_update2 = JoinedRoomUpdate {
1282 timeline: Timeline {
1283 events: vec![f.text_msg("bjr").event_id(eid3).into()],
1284 ..Default::default()
1285 },
1286 ..Default::default()
1287 };
1288
1289 let mut updates = RoomUpdates::default();
1290 updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1291 updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1292
1293 event_cache.inner.handle_room_updates(updates).await.unwrap();
1295
1296 let room1 = client.get_room(room_id1).unwrap();
1298
1299 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1300
1301 let found1 = room_event_cache.find_event(eid1).await.unwrap();
1302 assert_event_matches_msg(&found1, "hey");
1303
1304 let found2 = room_event_cache.find_event(eid2).await.unwrap();
1305 assert_event_matches_msg(&found2, "you");
1306
1307 assert!(room_event_cache.find_event(eid3).await.is_none());
1310 }
1311
1312 #[async_test]
1313 async fn test_save_event() {
1314 let client = logged_in_client(None).await;
1315 let room_id = room_id!("!galette:saucisse.bzh");
1316
1317 let event_cache = client.event_cache();
1318 event_cache.subscribe().unwrap();
1319
1320 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1321 let event_id = event_id!("$1");
1322
1323 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1324 let room = client.get_room(room_id).unwrap();
1325
1326 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1327 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1328
1329 assert!(room_event_cache.find_event(event_id).await.is_some());
1331 }
1332
1333 #[async_test]
1334 async fn test_generic_update_when_loading_rooms() {
1335 let user = user_id!("@mnt_io:matrix.org");
1337 let client = logged_in_client(None).await;
1338 let room_id_0 = room_id!("!raclette:patate.ch");
1339 let room_id_1 = room_id!("!fondue:patate.ch");
1340
1341 let event_factory = EventFactory::new().room(room_id_0).sender(user);
1342
1343 let event_cache = client.event_cache();
1344 event_cache.subscribe().unwrap();
1345
1346 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1347 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1348
1349 client
1350 .event_cache_store()
1351 .lock()
1352 .await
1353 .unwrap()
1354 .handle_linked_chunk_updates(
1355 LinkedChunkId::Room(room_id_0),
1356 vec![
1357 Update::NewItemsChunk {
1359 previous: None,
1360 new: ChunkIdentifier::new(0),
1361 next: None,
1362 },
1363 Update::PushItems {
1364 at: Position::new(ChunkIdentifier::new(0), 0),
1365 items: vec![event_factory
1366 .text_msg("hello")
1367 .sender(user)
1368 .event_id(event_id!("$ev0"))
1369 .into_event()],
1370 },
1371 ],
1372 )
1373 .await
1374 .unwrap();
1375
1376 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1377
1378 {
1380 let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1381
1382 assert_matches!(
1383 generic_stream.recv().await,
1384 Ok(RoomEventCacheGenericUpdate { room_id }) => {
1385 assert_eq!(room_id, room_id_0);
1386 }
1387 );
1388 }
1389
1390 {
1392 let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1393
1394 assert!(generic_stream.recv().now_or_never().is_none());
1395 }
1396 }
1397
1398 #[async_test]
1399 async fn test_generic_update_when_paginating_room() {
1400 let user = user_id!("@mnt_io:matrix.org");
1402 let client = logged_in_client(None).await;
1403 let room_id = room_id!("!raclette:patate.ch");
1404
1405 let event_factory = EventFactory::new().room(room_id).sender(user);
1406
1407 let event_cache = client.event_cache();
1408 event_cache.subscribe().unwrap();
1409
1410 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1411
1412 client
1413 .event_cache_store()
1414 .lock()
1415 .await
1416 .unwrap()
1417 .handle_linked_chunk_updates(
1418 LinkedChunkId::Room(room_id),
1419 vec![
1420 Update::NewItemsChunk {
1422 previous: None,
1423 new: ChunkIdentifier::new(0),
1424 next: None,
1425 },
1426 Update::NewItemsChunk {
1428 previous: Some(ChunkIdentifier::new(0)),
1429 new: ChunkIdentifier::new(1),
1430 next: None,
1431 },
1432 Update::NewItemsChunk {
1434 previous: Some(ChunkIdentifier::new(1)),
1435 new: ChunkIdentifier::new(2),
1436 next: None,
1437 },
1438 Update::PushItems {
1439 at: Position::new(ChunkIdentifier::new(2), 0),
1440 items: vec![event_factory
1441 .text_msg("hello")
1442 .sender(user)
1443 .event_id(event_id!("$ev0"))
1444 .into_event()],
1445 },
1446 Update::NewItemsChunk {
1448 previous: Some(ChunkIdentifier::new(2)),
1449 new: ChunkIdentifier::new(3),
1450 next: None,
1451 },
1452 Update::PushItems {
1453 at: Position::new(ChunkIdentifier::new(3), 0),
1454 items: vec![event_factory
1455 .text_msg("world")
1456 .sender(user)
1457 .event_id(event_id!("$ev1"))
1458 .into_event()],
1459 },
1460 ],
1461 )
1462 .await
1463 .unwrap();
1464
1465 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1466
1467 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1469
1470 assert_matches!(
1471 generic_stream.recv().await,
1472 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1473 assert_eq!(room_id, expected_room_id);
1474 }
1475 );
1476
1477 let pagination = room_event_cache.pagination();
1478
1479 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1481
1482 assert_eq!(pagination_outcome.events.len(), 1);
1483 assert!(pagination_outcome.reached_start.not());
1484 assert_matches!(
1485 generic_stream.recv().await,
1486 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1487 assert_eq!(room_id, expected_room_id);
1488 }
1489 );
1490
1491 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1493
1494 assert!(pagination_outcome.events.is_empty());
1495 assert!(pagination_outcome.reached_start.not());
1496 assert!(generic_stream.recv().now_or_never().is_none());
1497
1498 let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1500
1501 assert!(pagination_outcome.reached_start);
1502 assert!(generic_stream.recv().now_or_never().is_none());
1503 }
1504
1505 #[async_test]
1506 async fn test_for_room_when_room_is_not_found() {
1507 let client = logged_in_client(None).await;
1508 let room_id = room_id!("!raclette:patate.ch");
1509
1510 let event_cache = client.event_cache();
1511 event_cache.subscribe().unwrap();
1512
1513 assert_matches!(
1515 event_cache.for_room(room_id).await,
1516 Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1517 assert_eq!(room_id, not_found_room_id);
1518 }
1519 );
1520
1521 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1523
1524 assert!(event_cache.for_room(room_id).await.is_ok());
1526 }
1527
1528 #[cfg(not(target_family = "wasm"))]
1531 #[async_test]
1532 async fn test_no_refcycle_event_cache_tasks() {
1533 let client = MockClientBuilder::new(None).build().await;
1534
1535 sleep(Duration::from_secs(1)).await;
1537
1538 let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1539 assert_eq!(event_cache_weak.strong_count(), 1);
1540
1541 {
1542 let room_id = room_id!("!room:example.org");
1543
1544 let response = SyncResponseBuilder::default()
1546 .add_joined_room(JoinedRoomBuilder::new(room_id))
1547 .build_sync_response();
1548 client.inner.base_client.receive_sync_response(response).await.unwrap();
1549
1550 client.event_cache().subscribe().unwrap();
1551
1552 let (_room_event_cache, _drop_handles) =
1553 client.get_room(room_id).unwrap().event_cache().await.unwrap();
1554 }
1555
1556 drop(client);
1557
1558 sleep(Duration::from_secs(1)).await;
1560
1561 assert_eq!(
1563 event_cache_weak.strong_count(),
1564 0,
1565 "Too many strong references to the event cache {}",
1566 event_cache_weak.strong_count()
1567 );
1568 }
1569}