1#![forbid(missing_docs)]
29
30use std::{
31 collections::BTreeMap,
32 fmt::Debug,
33 sync::{Arc, OnceLock},
34};
35
36use eyeball::Subscriber;
37use eyeball_im::VectorDiff;
38use matrix_sdk_base::{
39 deserialized_responses::{AmbiguityChange, TimelineEvent},
40 event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
41 linked_chunk::lazy_loader::LazyLoaderError,
42 store_locks::LockStoreError,
43 sync::RoomUpdates,
44};
45use matrix_sdk_common::executor::{spawn, JoinHandle};
46use once_cell::sync::OnceCell;
47use room::RoomEventCacheState;
48use ruma::{
49 events::{
50 relation::RelationType,
51 room::{message::Relation, redaction::SyncRoomRedactionEvent},
52 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
53 AnySyncTimelineEvent,
54 },
55 serde::Raw,
56 EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
57};
58use tokio::sync::{
59 broadcast::{error::RecvError, Receiver},
60 Mutex, RwLock,
61};
62use tracing::{error, info, info_span, instrument, trace, warn, Instrument as _, Span};
63
64use self::paginator::PaginatorError;
65use crate::{client::WeakClient, Client};
66
67mod deduplicator;
68mod pagination;
69mod room;
70
71pub mod paginator;
72pub use pagination::{PaginationToken, RoomPagination};
73pub use room::RoomEventCache;
74
75#[derive(thiserror::Error, Debug)]
77pub enum EventCacheError {
78 #[error(
81 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
82 )]
83 NotSubscribedYet,
84
85 #[error("Room {0} hasn't been found in the Client.")]
90 RoomNotFound(OwnedRoomId),
91
92 #[error("The given back-pagination token is unknown to the event cache.")]
94 UnknownBackpaginationToken,
95
96 #[error("Error observed while back-paginating: {0}")]
98 BackpaginationError(#[from] PaginatorError),
99
100 #[error(transparent)]
102 Storage(#[from] EventCacheStoreError),
103
104 #[error(transparent)]
106 LockingStorage(#[from] LockStoreError),
107
108 #[error("The owning client of the event cache has been dropped.")]
112 ClientDropped,
113
114 #[error(transparent)]
119 LinkedChunkLoader(#[from] LazyLoaderError),
120}
121
122pub type Result<T> = std::result::Result<T, EventCacheError>;
124
125pub struct EventCacheDropHandles {
127 listen_updates_task: JoinHandle<()>,
129
130 ignore_user_list_update_task: JoinHandle<()>,
132}
133
134impl Debug for EventCacheDropHandles {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
137 }
138}
139
140impl Drop for EventCacheDropHandles {
141 fn drop(&mut self) {
142 self.listen_updates_task.abort();
143 self.ignore_user_list_update_task.abort();
144 }
145}
146
147#[derive(Clone)]
153pub struct EventCache {
154 inner: Arc<EventCacheInner>,
156}
157
158impl Debug for EventCache {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 f.debug_struct("EventCache").finish_non_exhaustive()
161 }
162}
163
164impl EventCache {
165 pub(crate) fn new(client: WeakClient) -> Self {
167 Self {
168 inner: Arc::new(EventCacheInner {
169 client,
170 store: Default::default(),
171 multiple_room_updates_lock: Default::default(),
172 by_room: Default::default(),
173 drop_handles: Default::default(),
174 all_events: Default::default(),
175 }),
176 }
177 }
178
179 pub fn enable_storage(&self) -> Result<()> {
184 let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
185 let client = self.inner.client()?;
186 Ok(client.event_cache_store().clone())
187 })?;
188 Ok(())
189 }
190
191 pub fn has_storage(&self) -> bool {
193 self.inner.has_storage()
194 }
195
196 pub fn subscribe(&self) -> Result<()> {
202 let client = self.inner.client()?;
203
204 let _ = self.inner.drop_handles.get_or_init(|| {
205 let listen_updates_task = spawn(Self::listen_task(
207 self.inner.clone(),
208 client.subscribe_to_all_room_updates(),
209 ));
210
211 let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
212 self.inner.clone(),
213 client.subscribe_to_ignore_user_list_changes(),
214 ));
215
216 Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task })
217 });
218
219 Ok(())
220 }
221
222 pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
226 self.inner
227 .all_events
228 .read()
229 .await
230 .events
231 .get(event_id)
232 .map(|(_room_id, event)| event.clone())
233 }
234
235 #[cfg(any(test, feature = "testing"))]
245 pub async fn empty_immutable_cache(&self) {
246 self.inner.all_events.write().await.events.clear();
247 }
248
249 #[instrument(skip_all)]
250 async fn ignore_user_list_update_task(
251 inner: Arc<EventCacheInner>,
252 mut ignore_user_list_stream: Subscriber<Vec<String>>,
253 ) {
254 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
255 span.follows_from(Span::current());
256
257 async move {
258 while ignore_user_list_stream.next().await.is_some() {
259 info!("Received an ignore user list change");
260 if let Err(err) = inner.clear_all_rooms().await {
261 error!("when clearing room storage after ignore user list change: {err}");
262 }
263 }
264 info!("Ignore user list stream has closed");
265 }
266 .instrument(span)
267 .await;
268 }
269
270 #[instrument(skip_all)]
271 async fn listen_task(
272 inner: Arc<EventCacheInner>,
273 mut room_updates_feed: Receiver<RoomUpdates>,
274 ) {
275 trace!("Spawning the listen task");
276 loop {
277 match room_updates_feed.recv().await {
278 Ok(updates) => {
279 if let Err(err) = inner.handle_room_updates(updates).await {
280 match err {
281 EventCacheError::ClientDropped => {
282 info!("Closing the event cache global listen task because client dropped");
284 break;
285 }
286 err => {
287 error!("Error when handling room updates: {err}");
288 }
289 }
290 }
291 }
292
293 Err(RecvError::Lagged(num_skipped)) => {
294 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
298 if let Err(err) = inner.clear_all_rooms().await {
299 error!("when clearing storage after lag in listen_task: {err}");
300 }
301 }
302
303 Err(RecvError::Closed) => {
304 info!("Closing the event cache global listen task because receiver closed");
306 break;
307 }
308 }
309 }
310 }
311
312 pub(crate) async fn for_room(
314 &self,
315 room_id: &RoomId,
316 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
317 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
318 return Err(EventCacheError::NotSubscribedYet);
319 };
320
321 let room = self.inner.for_room(room_id).await?;
322
323 Ok((room, drop_handles))
324 }
325
326 #[instrument(skip(self, events))]
331 pub async fn add_initial_events(
332 &self,
333 room_id: &RoomId,
334 events: Vec<TimelineEvent>,
335 prev_batch: Option<String>,
336 ) -> Result<()> {
337 if self.inner.has_storage() {
339 return Ok(());
340 }
341
342 let room_cache = self.inner.for_room(room_id).await?;
343
344 if !room_cache.inner.state.read().await.events().is_empty() {
347 return Ok(());
348 }
349
350 room_cache
355 .inner
356 .replace_all_events_by(events, prev_batch, Default::default(), Default::default())
357 .await?;
358
359 Ok(())
360 }
361}
362
363type AllEventsMap = BTreeMap<OwnedEventId, (OwnedRoomId, TimelineEvent)>;
364type RelationsMap = BTreeMap<OwnedEventId, BTreeMap<OwnedEventId, RelationType>>;
365
366#[derive(Default, Clone)]
369struct AllEventsCache {
370 events: AllEventsMap,
372 relations: RelationsMap,
375}
376
377impl AllEventsCache {
378 fn clear(&mut self) {
379 self.events.clear();
380 self.relations.clear();
381 }
382
383 fn append_related_event(&mut self, event: &TimelineEvent) {
386 let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() else {
388 return;
389 };
390
391 if let AnySyncMessageLikeEvent::RoomRedaction(room_redaction) = &ev {
393 let redacted_event_id = match room_redaction {
394 SyncRoomRedactionEvent::Original(ev) => {
395 ev.content.redacts.as_ref().or(ev.redacts.as_ref())
396 }
397 SyncRoomRedactionEvent::Redacted(redacted_redaction) => {
398 redacted_redaction.content.redacts.as_ref()
399 }
400 };
401
402 if let Some(redacted_event_id) = redacted_event_id {
403 self.relations
404 .entry(redacted_event_id.to_owned())
405 .or_default()
406 .insert(ev.event_id().to_owned(), RelationType::Replacement);
407 }
408
409 return;
410 }
411
412 let relationship = match ev.original_content() {
413 Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
414 if let Some(relation) = c.relates_to {
415 match relation {
416 Relation::Replacement(replacement) => {
417 Some((replacement.event_id, RelationType::Replacement))
418 }
419 Relation::Reply { in_reply_to } => {
420 Some((in_reply_to.event_id, RelationType::Reference))
421 }
422 Relation::Thread(thread) => Some((thread.event_id, RelationType::Thread)),
423 _ => None,
425 }
426 } else {
427 None
428 }
429 }
430 Some(AnyMessageLikeEventContent::PollResponse(c)) => {
431 Some((c.relates_to.event_id, RelationType::Reference))
432 }
433 Some(AnyMessageLikeEventContent::PollEnd(c)) => {
434 Some((c.relates_to.event_id, RelationType::Reference))
435 }
436 Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
437 Some((c.relates_to.event_id, RelationType::Reference))
438 }
439 Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
440 Some((c.relates_to.event_id, RelationType::Reference))
441 }
442 Some(AnyMessageLikeEventContent::Reaction(c)) => {
443 Some((c.relates_to.event_id, RelationType::Annotation))
444 }
445 _ => None,
446 };
447
448 if let Some(relationship) = relationship {
449 self.relations
450 .entry(relationship.0)
451 .or_default()
452 .insert(ev.event_id().to_owned(), relationship.1);
453 }
454 }
455
456 fn collect_related_events(
460 &self,
461 event_id: &EventId,
462 filter: Option<&[RelationType]>,
463 ) -> Vec<TimelineEvent> {
464 let mut results = Vec::new();
465 self.collect_related_events_rec(event_id, filter, &mut results);
466 results
467 }
468
469 fn collect_related_events_rec(
470 &self,
471 event_id: &EventId,
472 filter: Option<&[RelationType]>,
473 results: &mut Vec<TimelineEvent>,
474 ) {
475 let Some(related_event_ids) = self.relations.get(event_id) else {
476 return;
477 };
478
479 for (related_event_id, relation_type) in related_event_ids {
480 if let Some(filter) = filter {
481 if !filter.contains(relation_type) {
482 continue;
483 }
484 }
485
486 if results.iter().any(|event| {
488 event.event_id().is_some_and(|added_related_event_id| {
489 added_related_event_id == *related_event_id
490 })
491 }) {
492 continue;
493 }
494
495 if let Some((_, ev)) = self.events.get(related_event_id) {
496 results.push(ev.clone());
497 self.collect_related_events_rec(related_event_id, filter, results);
498 }
499 }
500 }
501}
502
503struct EventCacheInner {
504 client: WeakClient,
507
508 store: Arc<OnceCell<EventCacheStoreLock>>,
513
514 multiple_room_updates_lock: Mutex<()>,
521
522 by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
524
525 all_events: Arc<RwLock<AllEventsCache>>,
534
535 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
537}
538
539impl EventCacheInner {
540 fn client(&self) -> Result<Client> {
541 self.client.get().ok_or(EventCacheError::ClientDropped)
542 }
543
544 fn has_storage(&self) -> bool {
546 self.store.get().is_some()
547 }
548
549 async fn clear_all_rooms(&self) -> Result<()> {
551 let rooms = self.by_room.write().await;
559 for room in rooms.values() {
560 let updates_as_vector_diffs = room.inner.state.write().await.reset().await?;
562
563 let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
566 diffs: updates_as_vector_diffs,
567 origin: EventsOrigin::Sync,
568 });
569 }
570
571 Ok(())
572 }
573
574 #[instrument(skip(self, updates))]
576 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
577 let _lock = self.multiple_room_updates_lock.lock().await;
580
581 for (room_id, left_room_update) in updates.leave {
583 let room = self.for_room(&room_id).await?;
584
585 if let Err(err) =
586 room.inner.handle_left_room_update(self.has_storage(), left_room_update).await
587 {
588 error!("handling left room update: {err}");
590 }
591 }
592
593 for (room_id, joined_room_update) in updates.join {
595 let room = self.for_room(&room_id).await?;
596
597 if let Err(err) =
598 room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
599 {
600 error!("handling joined room update: {err}");
602 }
603 }
604
605 Ok(())
609 }
610
611 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
616 let by_room_guard = self.by_room.read().await;
619
620 match by_room_guard.get(room_id) {
621 Some(room) => Ok(room.clone()),
622
623 None => {
624 drop(by_room_guard);
626 let mut by_room_guard = self.by_room.write().await;
627
628 if let Some(room) = by_room_guard.get(room_id) {
631 return Ok(room.clone());
632 }
633
634 let room_state =
635 RoomEventCacheState::new(room_id.to_owned(), self.store.clone()).await?;
636
637 let room_version = self
638 .client
639 .get()
640 .and_then(|client| client.get_room(room_id))
641 .map(|room| room.clone_info().room_version_or_default())
642 .unwrap_or_else(|| {
643 warn!("unknown room version for {room_id}, using default V1");
644 RoomVersionId::V1
645 });
646
647 let room_event_cache = RoomEventCache::new(
648 self.client.clone(),
649 room_state,
650 room_id.to_owned(),
651 room_version,
652 self.all_events.clone(),
653 );
654
655 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
656
657 Ok(room_event_cache)
658 }
659 }
660 }
661}
662
663#[derive(Debug)]
665pub struct BackPaginationOutcome {
666 pub reached_start: bool,
668
669 pub events: Vec<TimelineEvent>,
676}
677
678#[derive(Debug, Clone)]
680pub enum RoomEventCacheUpdate {
681 MoveReadMarkerTo {
683 event_id: OwnedEventId,
685 },
686
687 UpdateMembers {
689 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
694 },
695
696 UpdateTimelineEvents {
698 diffs: Vec<VectorDiff<TimelineEvent>>,
700
701 origin: EventsOrigin,
703 },
704
705 AddEphemeralEvents {
707 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
710 },
711}
712
713#[derive(Debug, Clone)]
715pub enum EventsOrigin {
716 Sync,
718
719 Pagination,
721}
722
723#[cfg(test)]
724mod tests {
725 use assert_matches::assert_matches;
726 use futures_util::FutureExt as _;
727 use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
728 use matrix_sdk_test::{async_test, event_factory::EventFactory};
729 use ruma::{event_id, room_id, serde::Raw, user_id};
730 use serde_json::json;
731
732 use super::{EventCacheError, RoomEventCacheUpdate};
733 use crate::test_utils::{assert_event_matches_msg, logged_in_client};
734
735 #[async_test]
736 async fn test_must_explicitly_subscribe() {
737 let client = logged_in_client(None).await;
738
739 let event_cache = client.event_cache();
740
741 let room_id = room_id!("!omelette:fromage.fr");
744 let result = event_cache.for_room(room_id).await;
745
746 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
749 }
750
751 #[async_test]
752 async fn test_uniq_read_marker() {
753 let client = logged_in_client(None).await;
754 let room_id = room_id!("!galette:saucisse.bzh");
755 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
756
757 let event_cache = client.event_cache();
758
759 event_cache.subscribe().unwrap();
760
761 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
762
763 let (events, mut stream) = room_event_cache.subscribe().await;
764
765 assert!(events.is_empty());
766
767 let read_marker_event = Raw::from_json_string(
769 json!({
770 "content": {
771 "event_id": "$crepe:saucisse.bzh"
772 },
773 "room_id": "!galette:saucisse.bzh",
774 "type": "m.fully_read"
775 })
776 .to_string(),
777 )
778 .unwrap();
779 let account_data = vec![read_marker_event; 100];
780
781 room_event_cache
782 .inner
783 .handle_joined_room_update(
784 event_cache.inner.has_storage(),
785 JoinedRoomUpdate { account_data, ..Default::default() },
786 )
787 .await
788 .unwrap();
789
790 assert_matches!(
792 stream.recv().await.unwrap(),
793 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
794 );
795
796 assert!(stream.recv().now_or_never().is_none());
797 }
798
799 #[async_test]
800 async fn test_get_event_by_id() {
801 let client = logged_in_client(None).await;
802 let room_id1 = room_id!("!galette:saucisse.bzh");
803 let room_id2 = room_id!("!crepe:saucisse.bzh");
804
805 let event_cache = client.event_cache();
806 event_cache.subscribe().unwrap();
807
808 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
810
811 let eid1 = event_id!("$1");
812 let eid2 = event_id!("$2");
813 let eid3 = event_id!("$3");
814
815 let joined_room_update1 = JoinedRoomUpdate {
816 timeline: Timeline {
817 events: vec![
818 f.text_msg("hey").event_id(eid1).into(),
819 f.text_msg("you").event_id(eid2).into(),
820 ],
821 ..Default::default()
822 },
823 ..Default::default()
824 };
825
826 let joined_room_update2 = JoinedRoomUpdate {
827 timeline: Timeline {
828 events: vec![f.text_msg("bjr").event_id(eid3).into()],
829 ..Default::default()
830 },
831 ..Default::default()
832 };
833
834 let mut updates = RoomUpdates::default();
835 updates.join.insert(room_id1.to_owned(), joined_room_update1);
836 updates.join.insert(room_id2.to_owned(), joined_room_update2);
837
838 event_cache.inner.handle_room_updates(updates).await.unwrap();
840
841 let found1 = event_cache.event(eid1).await.unwrap();
843 assert_event_matches_msg(&found1, "hey");
844
845 let found2 = event_cache.event(eid2).await.unwrap();
846 assert_event_matches_msg(&found2, "you");
847
848 let found3 = event_cache.event(eid3).await.unwrap();
849 assert_event_matches_msg(&found3, "bjr");
850
851 assert!(event_cache.event(event_id!("$unknown")).await.is_none());
853
854 client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
856 let room1 = client.get_room(room_id1).unwrap();
857
858 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
859
860 let found1 = room_event_cache.event(eid1).await.unwrap();
861 assert_event_matches_msg(&found1, "hey");
862
863 let found2 = room_event_cache.event(eid2).await.unwrap();
864 assert_event_matches_msg(&found2, "you");
865
866 assert!(room_event_cache.event(eid3).await.is_none());
869 assert!(event_cache.event(eid3).await.is_some());
871 }
872
873 #[async_test]
874 async fn test_save_event_and_clear() {
875 let client = logged_in_client(None).await;
876 let room_id = room_id!("!galette:saucisse.bzh");
877
878 let event_cache = client.event_cache();
879 event_cache.subscribe().unwrap();
880
881 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
882 let event_id = event_id!("$1");
883
884 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
885 let room = client.get_room(room_id).unwrap();
886
887 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
888 room_event_cache.save_event(f.text_msg("hey there").event_id(event_id).into()).await;
889
890 assert!(room_event_cache.event(event_id).await.is_some());
892 assert!(event_cache.event(event_id).await.is_some());
894
895 event_cache.empty_immutable_cache().await;
896
897 assert!(room_event_cache.event(event_id).await.is_none());
899 assert!(event_cache.event(event_id).await.is_none());
900 }
901
902 #[async_test]
903 async fn test_add_initial_events() {
904 let client = logged_in_client(None).await;
906 let room_id = room_id!("!galette:saucisse.bzh");
907
908 let event_cache = client.event_cache();
909 event_cache.subscribe().unwrap();
910
911 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
912 event_cache
913 .add_initial_events(room_id, vec![f.text_msg("hey").into()], None)
914 .await
915 .unwrap();
916
917 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
918 let room = client.get_room(room_id).unwrap();
919
920 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
921 let (initial_events, _) = room_event_cache.subscribe().await;
922 assert_eq!(initial_events.len(), 1);
924 }
925}