matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::BTreeMap,
32    fmt::Debug,
33    sync::{Arc, OnceLock},
34};
35
36use eyeball::Subscriber;
37use eyeball_im::VectorDiff;
38use matrix_sdk_base::{
39    deserialized_responses::{AmbiguityChange, TimelineEvent},
40    event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
41    linked_chunk::lazy_loader::LazyLoaderError,
42    store_locks::LockStoreError,
43    sync::RoomUpdates,
44};
45use matrix_sdk_common::executor::{spawn, JoinHandle};
46use once_cell::sync::OnceCell;
47use room::RoomEventCacheState;
48use ruma::{
49    events::{
50        relation::RelationType,
51        room::{message::Relation, redaction::SyncRoomRedactionEvent},
52        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
53        AnySyncTimelineEvent,
54    },
55    serde::Raw,
56    EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
57};
58use tokio::sync::{
59    broadcast::{error::RecvError, Receiver},
60    Mutex, RwLock,
61};
62use tracing::{error, info, info_span, instrument, trace, warn, Instrument as _, Span};
63
64use self::paginator::PaginatorError;
65use crate::{client::WeakClient, Client};
66
67mod deduplicator;
68mod pagination;
69mod room;
70
71pub mod paginator;
72pub use pagination::{PaginationToken, RoomPagination};
73pub use room::RoomEventCache;
74
75/// An error observed in the [`EventCache`].
76#[derive(thiserror::Error, Debug)]
77pub enum EventCacheError {
78    /// The [`EventCache`] instance hasn't been initialized with
79    /// [`EventCache::subscribe`]
80    #[error(
81        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
82    )]
83    NotSubscribedYet,
84
85    /// The room hasn't been found in the client.
86    ///
87    /// Technically, it's possible to request a [`RoomEventCache`] for a room
88    /// that is not known to the client, leading to this error.
89    #[error("Room {0} hasn't been found in the Client.")]
90    RoomNotFound(OwnedRoomId),
91
92    /// The given back-pagination token is unknown to the event cache.
93    #[error("The given back-pagination token is unknown to the event cache.")]
94    UnknownBackpaginationToken,
95
96    /// An error has been observed while back-paginating.
97    #[error("Error observed while back-paginating: {0}")]
98    BackpaginationError(#[from] PaginatorError),
99
100    /// An error happening when interacting with storage.
101    #[error(transparent)]
102    Storage(#[from] EventCacheStoreError),
103
104    /// An error happening when attempting to (cross-process) lock storage.
105    #[error(transparent)]
106    LockingStorage(#[from] LockStoreError),
107
108    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
109    /// to. It's possible this weak reference points to nothing anymore, at
110    /// times where we try to use the client.
111    #[error("The owning client of the event cache has been dropped.")]
112    ClientDropped,
113
114    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
115    /// loader.
116    ///
117    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
118    #[error(transparent)]
119    LinkedChunkLoader(#[from] LazyLoaderError),
120}
121
122/// A result using the [`EventCacheError`].
123pub type Result<T> = std::result::Result<T, EventCacheError>;
124
125/// Hold handles to the tasks spawn by a [`RoomEventCache`].
126pub struct EventCacheDropHandles {
127    /// Task that listens to room updates.
128    listen_updates_task: JoinHandle<()>,
129
130    /// Task that listens to updates to the user's ignored list.
131    ignore_user_list_update_task: JoinHandle<()>,
132}
133
134impl Debug for EventCacheDropHandles {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
137    }
138}
139
140impl Drop for EventCacheDropHandles {
141    fn drop(&mut self) {
142        self.listen_updates_task.abort();
143        self.ignore_user_list_update_task.abort();
144    }
145}
146
147/// An event cache, providing lots of useful functionality for clients.
148///
149/// Cloning is shallow, and thus is cheap to do.
150///
151/// See also the module-level comment.
152#[derive(Clone)]
153pub struct EventCache {
154    /// Reference to the inner cache.
155    inner: Arc<EventCacheInner>,
156}
157
158impl Debug for EventCache {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        f.debug_struct("EventCache").finish_non_exhaustive()
161    }
162}
163
164impl EventCache {
165    /// Create a new [`EventCache`] for the given client.
166    pub(crate) fn new(client: WeakClient) -> Self {
167        Self {
168            inner: Arc::new(EventCacheInner {
169                client,
170                store: Default::default(),
171                multiple_room_updates_lock: Default::default(),
172                by_room: Default::default(),
173                drop_handles: Default::default(),
174                all_events: Default::default(),
175            }),
176        }
177    }
178
179    /// Enable storing updates to storage, and reload events from storage.
180    ///
181    /// Has an effect only the first time it's called. It's safe to call it
182    /// multiple times.
183    pub fn enable_storage(&self) -> Result<()> {
184        let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
185            let client = self.inner.client()?;
186            Ok(client.event_cache_store().clone())
187        })?;
188        Ok(())
189    }
190
191    /// Check whether the storage is enabled or not.
192    pub fn has_storage(&self) -> bool {
193        self.inner.has_storage()
194    }
195
196    /// Starts subscribing the [`EventCache`] to sync responses, if not done
197    /// before.
198    ///
199    /// Re-running this has no effect if we already subscribed before, and is
200    /// cheap.
201    pub fn subscribe(&self) -> Result<()> {
202        let client = self.inner.client()?;
203
204        let _ = self.inner.drop_handles.get_or_init(|| {
205            // Spawn the task that will listen to all the room updates at once.
206            let listen_updates_task = spawn(Self::listen_task(
207                self.inner.clone(),
208                client.subscribe_to_all_room_updates(),
209            ));
210
211            let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
212                self.inner.clone(),
213                client.subscribe_to_ignore_user_list_changes(),
214            ));
215
216            Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task })
217        });
218
219        Ok(())
220    }
221
222    /// Try to find an event by its ID in all the rooms.
223    // Note: replace this with a select-by-id query when this is implemented in a
224    // store.
225    pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
226        self.inner
227            .all_events
228            .read()
229            .await
230            .events
231            .get(event_id)
232            .map(|(_room_id, event)| event.clone())
233    }
234
235    /// Clear all the events from the immutable event cache.
236    ///
237    /// This keeps all the rooms along with their internal events linked chunks,
238    /// but it clears the side immutable cache for events.
239    ///
240    /// As such, it doesn't emit any [`RoomEventCacheUpdate`], and it's expected
241    /// to be only useful in testing contexts.
242    // Note: replace this with a remove query when this is implemented in a
243    // store.
244    #[cfg(any(test, feature = "testing"))]
245    pub async fn empty_immutable_cache(&self) {
246        self.inner.all_events.write().await.events.clear();
247    }
248
249    #[instrument(skip_all)]
250    async fn ignore_user_list_update_task(
251        inner: Arc<EventCacheInner>,
252        mut ignore_user_list_stream: Subscriber<Vec<String>>,
253    ) {
254        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
255        span.follows_from(Span::current());
256
257        async move {
258            while ignore_user_list_stream.next().await.is_some() {
259                info!("Received an ignore user list change");
260                if let Err(err) = inner.clear_all_rooms().await {
261                    error!("when clearing room storage after ignore user list change: {err}");
262                }
263            }
264            info!("Ignore user list stream has closed");
265        }
266        .instrument(span)
267        .await;
268    }
269
270    #[instrument(skip_all)]
271    async fn listen_task(
272        inner: Arc<EventCacheInner>,
273        mut room_updates_feed: Receiver<RoomUpdates>,
274    ) {
275        trace!("Spawning the listen task");
276        loop {
277            match room_updates_feed.recv().await {
278                Ok(updates) => {
279                    if let Err(err) = inner.handle_room_updates(updates).await {
280                        match err {
281                            EventCacheError::ClientDropped => {
282                                // The client has dropped, exit the listen task.
283                                info!("Closing the event cache global listen task because client dropped");
284                                break;
285                            }
286                            err => {
287                                error!("Error when handling room updates: {err}");
288                            }
289                        }
290                    }
291                }
292
293                Err(RecvError::Lagged(num_skipped)) => {
294                    // Forget everything we know; we could have missed events, and we have
295                    // no way to reconcile at the moment!
296                    // TODO: implement Smart Matching™,
297                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
298                    if let Err(err) = inner.clear_all_rooms().await {
299                        error!("when clearing storage after lag in listen_task: {err}");
300                    }
301                }
302
303                Err(RecvError::Closed) => {
304                    // The sender has shut down, exit.
305                    info!("Closing the event cache global listen task because receiver closed");
306                    break;
307                }
308            }
309        }
310    }
311
312    /// Return a room-specific view over the [`EventCache`].
313    pub(crate) async fn for_room(
314        &self,
315        room_id: &RoomId,
316    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
317        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
318            return Err(EventCacheError::NotSubscribedYet);
319        };
320
321        let room = self.inner.for_room(room_id).await?;
322
323        Ok((room, drop_handles))
324    }
325
326    /// Add an initial set of events to the event cache, reloaded from a cache.
327    ///
328    /// TODO: temporary for API compat, as the event cache should take care of
329    /// its own store.
330    #[instrument(skip(self, events))]
331    pub async fn add_initial_events(
332        &self,
333        room_id: &RoomId,
334        events: Vec<TimelineEvent>,
335        prev_batch: Option<String>,
336    ) -> Result<()> {
337        // If the event cache's storage has been enabled, do nothing.
338        if self.inner.has_storage() {
339            return Ok(());
340        }
341
342        let room_cache = self.inner.for_room(room_id).await?;
343
344        // If the linked chunked already has at least one event, ignore this request, as
345        // it should happen at most once per room.
346        if !room_cache.inner.state.read().await.events().is_empty() {
347            return Ok(());
348        }
349
350        // We could have received events during a previous sync; remove them all, since
351        // we can't know where to insert the "initial events" with respect to
352        // them.
353
354        room_cache
355            .inner
356            .replace_all_events_by(events, prev_batch, Default::default(), Default::default())
357            .await?;
358
359        Ok(())
360    }
361}
362
363type AllEventsMap = BTreeMap<OwnedEventId, (OwnedRoomId, TimelineEvent)>;
364type RelationsMap = BTreeMap<OwnedEventId, BTreeMap<OwnedEventId, RelationType>>;
365
366/// Cache wrapper containing both copies of received events and lists of event
367/// ids related to them.
368#[derive(Default, Clone)]
369struct AllEventsCache {
370    /// A cache of received events mapped by their event id.
371    events: AllEventsMap,
372    /// A cache of related event ids for an event id. The key is the original
373    /// event id and the value a list of event ids related to it.
374    relations: RelationsMap,
375}
376
377impl AllEventsCache {
378    fn clear(&mut self) {
379        self.events.clear();
380        self.relations.clear();
381    }
382
383    /// If the event is related to another one, its id is added to the relations
384    /// map.
385    fn append_related_event(&mut self, event: &TimelineEvent) {
386        // Handle and cache events and relations.
387        let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() else {
388            return;
389        };
390
391        // Handle redactions separately, as their logic is slightly different.
392        if let AnySyncMessageLikeEvent::RoomRedaction(room_redaction) = &ev {
393            let redacted_event_id = match room_redaction {
394                SyncRoomRedactionEvent::Original(ev) => {
395                    ev.content.redacts.as_ref().or(ev.redacts.as_ref())
396                }
397                SyncRoomRedactionEvent::Redacted(redacted_redaction) => {
398                    redacted_redaction.content.redacts.as_ref()
399                }
400            };
401
402            if let Some(redacted_event_id) = redacted_event_id {
403                self.relations
404                    .entry(redacted_event_id.to_owned())
405                    .or_default()
406                    .insert(ev.event_id().to_owned(), RelationType::Replacement);
407            }
408
409            return;
410        }
411
412        let relationship = match ev.original_content() {
413            Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
414                if let Some(relation) = c.relates_to {
415                    match relation {
416                        Relation::Replacement(replacement) => {
417                            Some((replacement.event_id, RelationType::Replacement))
418                        }
419                        Relation::Reply { in_reply_to } => {
420                            Some((in_reply_to.event_id, RelationType::Reference))
421                        }
422                        Relation::Thread(thread) => Some((thread.event_id, RelationType::Thread)),
423                        // Do nothing for custom
424                        _ => None,
425                    }
426                } else {
427                    None
428                }
429            }
430            Some(AnyMessageLikeEventContent::PollResponse(c)) => {
431                Some((c.relates_to.event_id, RelationType::Reference))
432            }
433            Some(AnyMessageLikeEventContent::PollEnd(c)) => {
434                Some((c.relates_to.event_id, RelationType::Reference))
435            }
436            Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
437                Some((c.relates_to.event_id, RelationType::Reference))
438            }
439            Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
440                Some((c.relates_to.event_id, RelationType::Reference))
441            }
442            Some(AnyMessageLikeEventContent::Reaction(c)) => {
443                Some((c.relates_to.event_id, RelationType::Annotation))
444            }
445            _ => None,
446        };
447
448        if let Some(relationship) = relationship {
449            self.relations
450                .entry(relationship.0)
451                .or_default()
452                .insert(ev.event_id().to_owned(), relationship.1);
453        }
454    }
455
456    /// Looks for related event ids for the passed event id, and appends them to
457    /// the `results` parameter. Then it'll recursively get the related
458    /// event ids for those too.
459    fn collect_related_events(
460        &self,
461        event_id: &EventId,
462        filter: Option<&[RelationType]>,
463    ) -> Vec<TimelineEvent> {
464        let mut results = Vec::new();
465        self.collect_related_events_rec(event_id, filter, &mut results);
466        results
467    }
468
469    fn collect_related_events_rec(
470        &self,
471        event_id: &EventId,
472        filter: Option<&[RelationType]>,
473        results: &mut Vec<TimelineEvent>,
474    ) {
475        let Some(related_event_ids) = self.relations.get(event_id) else {
476            return;
477        };
478
479        for (related_event_id, relation_type) in related_event_ids {
480            if let Some(filter) = filter {
481                if !filter.contains(relation_type) {
482                    continue;
483                }
484            }
485
486            // If the event was already added to the related ones, skip it.
487            if results.iter().any(|event| {
488                event.event_id().is_some_and(|added_related_event_id| {
489                    added_related_event_id == *related_event_id
490                })
491            }) {
492                continue;
493            }
494
495            if let Some((_, ev)) = self.events.get(related_event_id) {
496                results.push(ev.clone());
497                self.collect_related_events_rec(related_event_id, filter, results);
498            }
499        }
500    }
501}
502
503struct EventCacheInner {
504    /// A weak reference to the inner client, useful when trying to get a handle
505    /// on the owning client.
506    client: WeakClient,
507
508    /// Reference to the underlying store.
509    ///
510    /// Set to none if we shouldn't use storage for reading / writing linked
511    /// chunks.
512    store: Arc<OnceCell<EventCacheStoreLock>>,
513
514    /// A lock used when many rooms must be updated at once.
515    ///
516    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
517    /// ensure that multiple updates will be applied in the correct order, which
518    /// is enforced by taking this lock when handling an update.
519    // TODO: that's the place to add a cross-process lock!
520    multiple_room_updates_lock: Mutex<()>,
521
522    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
523    by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
524
525    /// All events, keyed by event id.
526    ///
527    /// Since events are immutable in Matrix, this is append-only — events can
528    /// be updated, though (e.g. if it was encrypted before, and
529    /// successfully decrypted later).
530    ///
531    /// This is shared between the [`EventCacheInner`] singleton and all
532    /// [`RoomEventCacheInner`] instances.
533    all_events: Arc<RwLock<AllEventsCache>>,
534
535    /// Handles to keep alive the task listening to updates.
536    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
537}
538
539impl EventCacheInner {
540    fn client(&self) -> Result<Client> {
541        self.client.get().ok_or(EventCacheError::ClientDropped)
542    }
543
544    /// Has persistent storage been enabled for the event cache?
545    fn has_storage(&self) -> bool {
546        self.store.get().is_some()
547    }
548
549    /// Clears all the room's data.
550    async fn clear_all_rooms(&self) -> Result<()> {
551        // Note: one must NOT clear the `by_room` map, because if something subscribed
552        // to a room update, they would never get any new update for that room, since
553        // re-creating the `RoomEventCache` would create a new unrelated sender.
554
555        // Note 2: we don't need to clear the [`Self::events`] map, because events are
556        // immutable in the Matrix protocol.
557
558        let rooms = self.by_room.write().await;
559        for room in rooms.values() {
560            // Clear all the room state.
561            let updates_as_vector_diffs = room.inner.state.write().await.reset().await?;
562
563            // Notify all the observers that we've lost track of state. (We ignore the
564            // error if there aren't any.)
565            let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
566                diffs: updates_as_vector_diffs,
567                origin: EventsOrigin::Sync,
568            });
569        }
570
571        Ok(())
572    }
573
574    /// Handles a single set of room updates at once.
575    #[instrument(skip(self, updates))]
576    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
577        // First, take the lock that indicates we're processing updates, to avoid
578        // handling multiple updates concurrently.
579        let _lock = self.multiple_room_updates_lock.lock().await;
580
581        // Left rooms.
582        for (room_id, left_room_update) in updates.leave {
583            let room = self.for_room(&room_id).await?;
584
585            if let Err(err) =
586                room.inner.handle_left_room_update(self.has_storage(), left_room_update).await
587            {
588                // Non-fatal error, try to continue to the next room.
589                error!("handling left room update: {err}");
590            }
591        }
592
593        // Joined rooms.
594        for (room_id, joined_room_update) in updates.join {
595            let room = self.for_room(&room_id).await?;
596
597            if let Err(err) =
598                room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
599            {
600                // Non-fatal error, try to continue to the next room.
601                error!("handling joined room update: {err}");
602            }
603        }
604
605        // Invited rooms.
606        // TODO: we don't anything with `updates.invite` at this point.
607
608        Ok(())
609    }
610
611    /// Return a room-specific view over the [`EventCache`].
612    ///
613    /// It may not be found, if the room isn't known to the client, in which
614    /// case it'll return None.
615    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
616        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
617        // write lock.
618        let by_room_guard = self.by_room.read().await;
619
620        match by_room_guard.get(room_id) {
621            Some(room) => Ok(room.clone()),
622
623            None => {
624                // Slow-path: the entry doesn't exist; let's acquire a write lock.
625                drop(by_room_guard);
626                let mut by_room_guard = self.by_room.write().await;
627
628                // In the meanwhile, some other caller might have obtained write access and done
629                // the same, so check for existence again.
630                if let Some(room) = by_room_guard.get(room_id) {
631                    return Ok(room.clone());
632                }
633
634                let room_state =
635                    RoomEventCacheState::new(room_id.to_owned(), self.store.clone()).await?;
636
637                let room_version = self
638                    .client
639                    .get()
640                    .and_then(|client| client.get_room(room_id))
641                    .map(|room| room.clone_info().room_version_or_default())
642                    .unwrap_or_else(|| {
643                        warn!("unknown room version for {room_id}, using default V1");
644                        RoomVersionId::V1
645                    });
646
647                let room_event_cache = RoomEventCache::new(
648                    self.client.clone(),
649                    room_state,
650                    room_id.to_owned(),
651                    room_version,
652                    self.all_events.clone(),
653                );
654
655                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
656
657                Ok(room_event_cache)
658            }
659        }
660    }
661}
662
663/// The result of a single back-pagination request.
664#[derive(Debug)]
665pub struct BackPaginationOutcome {
666    /// Did the back-pagination reach the start of the timeline?
667    pub reached_start: bool,
668
669    /// All the events that have been returned in the back-pagination
670    /// request.
671    ///
672    /// Events are presented in reverse order: the first element of the vec,
673    /// if present, is the most "recent" event from the chunk (or
674    /// technically, the last one in the topological ordering).
675    pub events: Vec<TimelineEvent>,
676}
677
678/// An update related to events happened in a room.
679#[derive(Debug, Clone)]
680pub enum RoomEventCacheUpdate {
681    /// The fully read marker has moved to a different event.
682    MoveReadMarkerTo {
683        /// Event at which the read marker is now pointing.
684        event_id: OwnedEventId,
685    },
686
687    /// The members have changed.
688    UpdateMembers {
689        /// Collection of ambiguity changes that room member events trigger.
690        ///
691        /// This is a map of event ID of the `m.room.member` event to the
692        /// details of the ambiguity change.
693        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
694    },
695
696    /// The room has received updates for the timeline as _diffs_.
697    UpdateTimelineEvents {
698        /// Diffs to apply to the timeline.
699        diffs: Vec<VectorDiff<TimelineEvent>>,
700
701        /// Where the diffs are coming from.
702        origin: EventsOrigin,
703    },
704
705    /// The room has received new ephemeral events.
706    AddEphemeralEvents {
707        /// XXX: this is temporary, until read receipts are handled in the event
708        /// cache
709        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
710    },
711}
712
713/// Indicate where events are coming from.
714#[derive(Debug, Clone)]
715pub enum EventsOrigin {
716    /// Events are coming from a sync.
717    Sync,
718
719    /// Events are coming from pagination.
720    Pagination,
721}
722
723#[cfg(test)]
724mod tests {
725    use assert_matches::assert_matches;
726    use futures_util::FutureExt as _;
727    use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
728    use matrix_sdk_test::{async_test, event_factory::EventFactory};
729    use ruma::{event_id, room_id, serde::Raw, user_id};
730    use serde_json::json;
731
732    use super::{EventCacheError, RoomEventCacheUpdate};
733    use crate::test_utils::{assert_event_matches_msg, logged_in_client};
734
735    #[async_test]
736    async fn test_must_explicitly_subscribe() {
737        let client = logged_in_client(None).await;
738
739        let event_cache = client.event_cache();
740
741        // If I create a room event subscriber for a room before subscribing the event
742        // cache,
743        let room_id = room_id!("!omelette:fromage.fr");
744        let result = event_cache.for_room(room_id).await;
745
746        // Then it fails, because one must explicitly call `.subscribe()` on the event
747        // cache.
748        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
749    }
750
751    #[async_test]
752    async fn test_uniq_read_marker() {
753        let client = logged_in_client(None).await;
754        let room_id = room_id!("!galette:saucisse.bzh");
755        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
756
757        let event_cache = client.event_cache();
758
759        event_cache.subscribe().unwrap();
760
761        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
762
763        let (events, mut stream) = room_event_cache.subscribe().await;
764
765        assert!(events.is_empty());
766
767        // When sending multiple times the same read marker event,…
768        let read_marker_event = Raw::from_json_string(
769            json!({
770                "content": {
771                    "event_id": "$crepe:saucisse.bzh"
772                },
773                "room_id": "!galette:saucisse.bzh",
774                "type": "m.fully_read"
775            })
776            .to_string(),
777        )
778        .unwrap();
779        let account_data = vec![read_marker_event; 100];
780
781        room_event_cache
782            .inner
783            .handle_joined_room_update(
784                event_cache.inner.has_storage(),
785                JoinedRoomUpdate { account_data, ..Default::default() },
786            )
787            .await
788            .unwrap();
789
790        // … there's only one read marker update.
791        assert_matches!(
792            stream.recv().await.unwrap(),
793            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
794        );
795
796        assert!(stream.recv().now_or_never().is_none());
797    }
798
799    #[async_test]
800    async fn test_get_event_by_id() {
801        let client = logged_in_client(None).await;
802        let room_id1 = room_id!("!galette:saucisse.bzh");
803        let room_id2 = room_id!("!crepe:saucisse.bzh");
804
805        let event_cache = client.event_cache();
806        event_cache.subscribe().unwrap();
807
808        // Insert two rooms with a few events.
809        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
810
811        let eid1 = event_id!("$1");
812        let eid2 = event_id!("$2");
813        let eid3 = event_id!("$3");
814
815        let joined_room_update1 = JoinedRoomUpdate {
816            timeline: Timeline {
817                events: vec![
818                    f.text_msg("hey").event_id(eid1).into(),
819                    f.text_msg("you").event_id(eid2).into(),
820                ],
821                ..Default::default()
822            },
823            ..Default::default()
824        };
825
826        let joined_room_update2 = JoinedRoomUpdate {
827            timeline: Timeline {
828                events: vec![f.text_msg("bjr").event_id(eid3).into()],
829                ..Default::default()
830            },
831            ..Default::default()
832        };
833
834        let mut updates = RoomUpdates::default();
835        updates.join.insert(room_id1.to_owned(), joined_room_update1);
836        updates.join.insert(room_id2.to_owned(), joined_room_update2);
837
838        // Have the event cache handle them.
839        event_cache.inner.handle_room_updates(updates).await.unwrap();
840
841        // Now retrieve all the events one by one.
842        let found1 = event_cache.event(eid1).await.unwrap();
843        assert_event_matches_msg(&found1, "hey");
844
845        let found2 = event_cache.event(eid2).await.unwrap();
846        assert_event_matches_msg(&found2, "you");
847
848        let found3 = event_cache.event(eid3).await.unwrap();
849        assert_event_matches_msg(&found3, "bjr");
850
851        // An unknown event won't be found.
852        assert!(event_cache.event(event_id!("$unknown")).await.is_none());
853
854        // Can also find events in a single room.
855        client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
856        let room1 = client.get_room(room_id1).unwrap();
857
858        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
859
860        let found1 = room_event_cache.event(eid1).await.unwrap();
861        assert_event_matches_msg(&found1, "hey");
862
863        let found2 = room_event_cache.event(eid2).await.unwrap();
864        assert_event_matches_msg(&found2, "you");
865
866        // Retrieving the event with id3 from the room which doesn't contain it will
867        // fail…
868        assert!(room_event_cache.event(eid3).await.is_none());
869        // …but it doesn't fail at the client-wide level.
870        assert!(event_cache.event(eid3).await.is_some());
871    }
872
873    #[async_test]
874    async fn test_save_event_and_clear() {
875        let client = logged_in_client(None).await;
876        let room_id = room_id!("!galette:saucisse.bzh");
877
878        let event_cache = client.event_cache();
879        event_cache.subscribe().unwrap();
880
881        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
882        let event_id = event_id!("$1");
883
884        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
885        let room = client.get_room(room_id).unwrap();
886
887        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
888        room_event_cache.save_event(f.text_msg("hey there").event_id(event_id).into()).await;
889
890        // Retrieving the event at the room-wide cache works.
891        assert!(room_event_cache.event(event_id).await.is_some());
892        // Also at the client level.
893        assert!(event_cache.event(event_id).await.is_some());
894
895        event_cache.empty_immutable_cache().await;
896
897        // After clearing, both fail to find the event.
898        assert!(room_event_cache.event(event_id).await.is_none());
899        assert!(event_cache.event(event_id).await.is_none());
900    }
901
902    #[async_test]
903    async fn test_add_initial_events() {
904        // TODO: remove this test when the event cache uses its own persistent storage.
905        let client = logged_in_client(None).await;
906        let room_id = room_id!("!galette:saucisse.bzh");
907
908        let event_cache = client.event_cache();
909        event_cache.subscribe().unwrap();
910
911        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
912        event_cache
913            .add_initial_events(room_id, vec![f.text_msg("hey").into()], None)
914            .await
915            .unwrap();
916
917        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
918        let room = client.get_room(room_id).unwrap();
919
920        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
921        let (initial_events, _) = room_event_cache.subscribe().await;
922        // `add_initial_events` had an effect.
923        assert_eq!(initial_events.len(), 1);
924    }
925}