matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::BTreeMap,
32    fmt::Debug,
33    sync::{Arc, OnceLock},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use matrix_sdk_base::{
39    deserialized_responses::{AmbiguityChange, TimelineEvent},
40    event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
41    linked_chunk::lazy_loader::LazyLoaderError,
42    store_locks::LockStoreError,
43    sync::RoomUpdates,
44};
45use matrix_sdk_common::executor::{spawn, JoinHandle};
46use once_cell::sync::OnceCell;
47use room::RoomEventCacheState;
48use ruma::{
49    events::{
50        relation::RelationType,
51        room::{message::Relation, redaction::SyncRoomRedactionEvent},
52        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
53        AnySyncTimelineEvent,
54    },
55    serde::Raw,
56    EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
57};
58use tokio::sync::{
59    broadcast::{error::RecvError, Receiver},
60    mpsc, Mutex, RwLock,
61};
62use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
63
64use self::paginator::PaginatorError;
65use crate::{client::WeakClient, Client};
66
67mod deduplicator;
68mod pagination;
69mod room;
70
71pub mod paginator;
72pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus};
73pub use room::{RoomEventCache, RoomEventCacheListener};
74
75/// An error observed in the [`EventCache`].
76#[derive(thiserror::Error, Debug)]
77pub enum EventCacheError {
78    /// The [`EventCache`] instance hasn't been initialized with
79    /// [`EventCache::subscribe`]
80    #[error(
81        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
82    )]
83    NotSubscribedYet,
84
85    /// An error has been observed while back-paginating.
86    #[error(transparent)]
87    BackpaginationError(#[from] PaginatorError),
88
89    /// Back-pagination was already happening in a given room, where we tried to
90    /// back-paginate again.
91    #[error("We were already back-paginating.")]
92    AlreadyBackpaginating,
93
94    /// An error happening when interacting with storage.
95    #[error(transparent)]
96    Storage(#[from] EventCacheStoreError),
97
98    /// An error happening when attempting to (cross-process) lock storage.
99    #[error(transparent)]
100    LockingStorage(#[from] LockStoreError),
101
102    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
103    /// to. It's possible this weak reference points to nothing anymore, at
104    /// times where we try to use the client.
105    #[error("The owning client of the event cache has been dropped.")]
106    ClientDropped,
107
108    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
109    /// loader.
110    ///
111    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
112    #[error(transparent)]
113    LinkedChunkLoader(#[from] LazyLoaderError),
114}
115
116/// A result using the [`EventCacheError`].
117pub type Result<T> = std::result::Result<T, EventCacheError>;
118
119/// Hold handles to the tasks spawn by a [`RoomEventCache`].
120pub struct EventCacheDropHandles {
121    /// Task that listens to room updates.
122    listen_updates_task: JoinHandle<()>,
123
124    /// Task that listens to updates to the user's ignored list.
125    ignore_user_list_update_task: JoinHandle<()>,
126
127    /// The task used to automatically shrink the linked chunks.
128    auto_shrink_linked_chunk_task: JoinHandle<()>,
129}
130
131impl Debug for EventCacheDropHandles {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
134    }
135}
136
137impl Drop for EventCacheDropHandles {
138    fn drop(&mut self) {
139        self.listen_updates_task.abort();
140        self.ignore_user_list_update_task.abort();
141        self.auto_shrink_linked_chunk_task.abort();
142    }
143}
144
145/// An event cache, providing lots of useful functionality for clients.
146///
147/// Cloning is shallow, and thus is cheap to do.
148///
149/// See also the module-level comment.
150#[derive(Clone)]
151pub struct EventCache {
152    /// Reference to the inner cache.
153    inner: Arc<EventCacheInner>,
154}
155
156impl Debug for EventCache {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("EventCache").finish_non_exhaustive()
159    }
160}
161
162impl EventCache {
163    /// Create a new [`EventCache`] for the given client.
164    pub(crate) fn new(client: WeakClient) -> Self {
165        Self {
166            inner: Arc::new(EventCacheInner {
167                client,
168                store: Default::default(),
169                multiple_room_updates_lock: Default::default(),
170                by_room: Default::default(),
171                drop_handles: Default::default(),
172                all_events: Default::default(),
173                auto_shrink_sender: Default::default(),
174            }),
175        }
176    }
177
178    /// Enable storing updates to storage, and reload events from storage.
179    ///
180    /// Has an effect only the first time it's called. It's safe to call it
181    /// multiple times.
182    pub fn enable_storage(&self) -> Result<()> {
183        let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
184            let client = self.inner.client()?;
185            Ok(client.event_cache_store().clone())
186        })?;
187        Ok(())
188    }
189
190    /// Check whether the storage is enabled or not.
191    pub fn has_storage(&self) -> bool {
192        self.inner.has_storage()
193    }
194
195    /// Starts subscribing the [`EventCache`] to sync responses, if not done
196    /// before.
197    ///
198    /// Re-running this has no effect if we already subscribed before, and is
199    /// cheap.
200    pub fn subscribe(&self) -> Result<()> {
201        let client = self.inner.client()?;
202
203        let _ = self.inner.drop_handles.get_or_init(|| {
204            // Spawn the task that will listen to all the room updates at once.
205            let listen_updates_task = spawn(Self::listen_task(
206                self.inner.clone(),
207                client.subscribe_to_all_room_updates(),
208            ));
209
210            let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
211                self.inner.clone(),
212                client.subscribe_to_ignore_user_list_changes(),
213            ));
214
215            let (tx, rx) = mpsc::channel(32);
216
217            // Force-initialize the sender in the [`RoomEventCacheInner`].
218            self.inner.auto_shrink_sender.get_or_init(|| tx);
219
220            let auto_shrink_linked_chunk_tasks =
221                spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx));
222
223            Arc::new(EventCacheDropHandles {
224                listen_updates_task,
225                ignore_user_list_update_task,
226                auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks,
227            })
228        });
229
230        Ok(())
231    }
232
233    /// Try to find an event by its ID in all the rooms.
234    // Note: replace this with a select-by-id query when this is implemented in a
235    // store.
236    pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
237        self.inner
238            .all_events
239            .read()
240            .await
241            .events
242            .get(event_id)
243            .map(|(_room_id, event)| event.clone())
244    }
245
246    /// Clear all the events from the immutable event cache.
247    ///
248    /// This keeps all the rooms along with their internal events linked chunks,
249    /// but it clears the side immutable cache for events.
250    ///
251    /// As such, it doesn't emit any [`RoomEventCacheUpdate`], and it's expected
252    /// to be only useful in testing contexts.
253    // Note: replace this with a remove query when this is implemented in a
254    // store.
255    #[cfg(any(test, feature = "testing"))]
256    pub async fn empty_immutable_cache(&self) {
257        self.inner.all_events.write().await.events.clear();
258    }
259
260    #[instrument(skip_all)]
261    async fn ignore_user_list_update_task(
262        inner: Arc<EventCacheInner>,
263        mut ignore_user_list_stream: Subscriber<Vec<String>>,
264    ) {
265        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
266        span.follows_from(Span::current());
267
268        async move {
269            while ignore_user_list_stream.next().await.is_some() {
270                info!("Received an ignore user list change");
271                if let Err(err) = inner.clear_all_rooms().await {
272                    error!("when clearing room storage after ignore user list change: {err}");
273                }
274            }
275            info!("Ignore user list stream has closed");
276        }
277        .instrument(span)
278        .await;
279    }
280
281    #[instrument(skip_all)]
282    async fn listen_task(
283        inner: Arc<EventCacheInner>,
284        mut room_updates_feed: Receiver<RoomUpdates>,
285    ) {
286        trace!("Spawning the listen task");
287        loop {
288            match room_updates_feed.recv().await {
289                Ok(updates) => {
290                    if let Err(err) = inner.handle_room_updates(updates).await {
291                        match err {
292                            EventCacheError::ClientDropped => {
293                                // The client has dropped, exit the listen task.
294                                info!("Closing the event cache global listen task because client dropped");
295                                break;
296                            }
297                            err => {
298                                error!("Error when handling room updates: {err}");
299                            }
300                        }
301                    }
302                }
303
304                Err(RecvError::Lagged(num_skipped)) => {
305                    // Forget everything we know; we could have missed events, and we have
306                    // no way to reconcile at the moment!
307                    // TODO: implement Smart Matching™,
308                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
309                    if let Err(err) = inner.clear_all_rooms().await {
310                        error!("when clearing storage after lag in listen_task: {err}");
311                    }
312                }
313
314                Err(RecvError::Closed) => {
315                    // The sender has shut down, exit.
316                    info!("Closing the event cache global listen task because receiver closed");
317                    break;
318                }
319            }
320        }
321    }
322
323    /// Spawns the task that will listen to auto-shrink notifications.
324    ///
325    /// The auto-shrink mechanism works this way:
326    ///
327    /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
328    ///   increment the active number of listeners to that room, aka
329    ///   [`RoomEventCacheState::listener_count`].
330    /// - When that subscriber is dropped, it will decrement that count; and
331    ///   notify the task below if it reached 0.
332    /// - The task spawned here, owned by the [`EventCacheInner`], will listen
333    ///   to such notifications that a room may be shrunk. It will attempt an
334    ///   auto-shrink, by letting the inner state decide whether this is a good
335    ///   time to do so (new listeners might have spawned in the meanwhile).
336    #[instrument(skip_all)]
337    async fn auto_shrink_linked_chunk_task(
338        inner: Arc<EventCacheInner>,
339        mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
340    ) {
341        while let Some(room_id) = rx.recv().await {
342            trace!(for_room = %room_id, "received notification to shrink");
343
344            let room = match inner.for_room(&room_id).await {
345                Ok(room) => room,
346                Err(err) => {
347                    warn!(for_room = %room_id, "error when getting a RoomEventCache: {err}");
348                    continue;
349                }
350            };
351
352            trace!("waiting for state lock…");
353            let mut state = room.inner.state.write().await;
354
355            match state.auto_shrink_if_no_listeners().await {
356                Ok(diffs) => {
357                    if let Some(diffs) = diffs {
358                        // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
359                        // listeners, right? RIGHT? Especially because the state is guarded behind
360                        // a lock.
361                        //
362                        // However, better safe than sorry, and it's cheap to send an update here,
363                        // so let's do it!
364                        if !diffs.is_empty() {
365                            let _ = room.inner.sender.send(
366                                RoomEventCacheUpdate::UpdateTimelineEvents {
367                                    diffs,
368                                    origin: EventsOrigin::Cache,
369                                },
370                            );
371                        }
372                    } else {
373                        debug!("auto-shrinking didn't happen");
374                    }
375                }
376
377                Err(err) => {
378                    // There's not much we can do here, unfortunately.
379                    warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
380                }
381            }
382        }
383    }
384
385    /// Return a room-specific view over the [`EventCache`].
386    pub(crate) async fn for_room(
387        &self,
388        room_id: &RoomId,
389    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
390        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
391            return Err(EventCacheError::NotSubscribedYet);
392        };
393
394        let room = self.inner.for_room(room_id).await?;
395
396        Ok((room, drop_handles))
397    }
398
399    /// Add an initial set of events to the event cache, reloaded from a cache.
400    ///
401    /// TODO: temporary for API compat, as the event cache should take care of
402    /// its own store.
403    #[instrument(skip(self, events))]
404    pub async fn add_initial_events(
405        &self,
406        room_id: &RoomId,
407        events: Vec<TimelineEvent>,
408        prev_batch: Option<String>,
409    ) -> Result<()> {
410        // If the event cache's storage has been enabled, do nothing.
411        if self.inner.has_storage() {
412            return Ok(());
413        }
414
415        let room_cache = self.inner.for_room(room_id).await?;
416
417        // If the linked chunked already has at least one event, ignore this request, as
418        // it should happen at most once per room.
419        if !room_cache.inner.state.read().await.events().is_empty() {
420            return Ok(());
421        }
422
423        // We could have received events during a previous sync; remove them all, since
424        // we can't know where to insert the "initial events" with respect to
425        // them.
426
427        room_cache
428            .inner
429            .replace_all_events_by(
430                events,
431                prev_batch,
432                Default::default(),
433                Default::default(),
434                EventsOrigin::Cache,
435            )
436            .await?;
437
438        Ok(())
439    }
440}
441
442type AllEventsMap = BTreeMap<OwnedEventId, (OwnedRoomId, TimelineEvent)>;
443type RelationsMap = BTreeMap<OwnedEventId, BTreeMap<OwnedEventId, RelationType>>;
444
445/// Cache wrapper containing both copies of received events and lists of event
446/// ids related to them.
447#[derive(Default, Clone)]
448struct AllEventsCache {
449    /// A cache of received events mapped by their event id.
450    events: AllEventsMap,
451    /// A cache of related event ids for an event id. The key is the original
452    /// event id and the value a list of event ids related to it.
453    relations: RelationsMap,
454}
455
456impl AllEventsCache {
457    fn clear(&mut self) {
458        self.events.clear();
459        self.relations.clear();
460    }
461
462    /// If the event is related to another one, its id is added to the relations
463    /// map.
464    fn append_related_event(&mut self, event: &TimelineEvent) {
465        // Handle and cache events and relations.
466        let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() else {
467            return;
468        };
469
470        // Handle redactions separately, as their logic is slightly different.
471        if let AnySyncMessageLikeEvent::RoomRedaction(room_redaction) = &ev {
472            let redacted_event_id = match room_redaction {
473                SyncRoomRedactionEvent::Original(ev) => {
474                    ev.content.redacts.as_ref().or(ev.redacts.as_ref())
475                }
476                SyncRoomRedactionEvent::Redacted(redacted_redaction) => {
477                    redacted_redaction.content.redacts.as_ref()
478                }
479            };
480
481            if let Some(redacted_event_id) = redacted_event_id {
482                self.relations
483                    .entry(redacted_event_id.to_owned())
484                    .or_default()
485                    .insert(ev.event_id().to_owned(), RelationType::Replacement);
486            }
487
488            return;
489        }
490
491        let relationship = match ev.original_content() {
492            Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
493                if let Some(relation) = c.relates_to {
494                    match relation {
495                        Relation::Replacement(replacement) => {
496                            Some((replacement.event_id, RelationType::Replacement))
497                        }
498                        Relation::Reply { in_reply_to } => {
499                            Some((in_reply_to.event_id, RelationType::Reference))
500                        }
501                        Relation::Thread(thread) => Some((thread.event_id, RelationType::Thread)),
502                        // Do nothing for custom
503                        _ => None,
504                    }
505                } else {
506                    None
507                }
508            }
509            Some(AnyMessageLikeEventContent::PollResponse(c)) => {
510                Some((c.relates_to.event_id, RelationType::Reference))
511            }
512            Some(AnyMessageLikeEventContent::PollEnd(c)) => {
513                Some((c.relates_to.event_id, RelationType::Reference))
514            }
515            Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
516                Some((c.relates_to.event_id, RelationType::Reference))
517            }
518            Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
519                Some((c.relates_to.event_id, RelationType::Reference))
520            }
521            Some(AnyMessageLikeEventContent::Reaction(c)) => {
522                Some((c.relates_to.event_id, RelationType::Annotation))
523            }
524            _ => None,
525        };
526
527        if let Some(relationship) = relationship {
528            self.relations
529                .entry(relationship.0)
530                .or_default()
531                .insert(ev.event_id().to_owned(), relationship.1);
532        }
533    }
534
535    /// Looks for related event ids for the passed event id, and appends them to
536    /// the `results` parameter. Then it'll recursively get the related
537    /// event ids for those too.
538    fn collect_related_events(
539        &self,
540        event_id: &EventId,
541        filter: Option<&[RelationType]>,
542    ) -> Vec<TimelineEvent> {
543        let mut results = Vec::new();
544        self.collect_related_events_rec(event_id, filter, &mut results);
545        results
546    }
547
548    fn collect_related_events_rec(
549        &self,
550        event_id: &EventId,
551        filter: Option<&[RelationType]>,
552        results: &mut Vec<TimelineEvent>,
553    ) {
554        let Some(related_event_ids) = self.relations.get(event_id) else {
555            return;
556        };
557
558        for (related_event_id, relation_type) in related_event_ids {
559            if let Some(filter) = filter {
560                if !filter.contains(relation_type) {
561                    continue;
562                }
563            }
564
565            // If the event was already added to the related ones, skip it.
566            if results.iter().any(|event| {
567                event.event_id().is_some_and(|added_related_event_id| {
568                    added_related_event_id == *related_event_id
569                })
570            }) {
571                continue;
572            }
573
574            if let Some((_, ev)) = self.events.get(related_event_id) {
575                results.push(ev.clone());
576                self.collect_related_events_rec(related_event_id, filter, results);
577            }
578        }
579    }
580}
581
582struct EventCacheInner {
583    /// A weak reference to the inner client, useful when trying to get a handle
584    /// on the owning client.
585    client: WeakClient,
586
587    /// Reference to the underlying store.
588    ///
589    /// Set to none if we shouldn't use storage for reading / writing linked
590    /// chunks.
591    store: Arc<OnceCell<EventCacheStoreLock>>,
592
593    /// A lock used when many rooms must be updated at once.
594    ///
595    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
596    /// ensure that multiple updates will be applied in the correct order, which
597    /// is enforced by taking this lock when handling an update.
598    // TODO: that's the place to add a cross-process lock!
599    multiple_room_updates_lock: Mutex<()>,
600
601    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
602    by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
603
604    /// All events, keyed by event id.
605    ///
606    /// Since events are immutable in Matrix, this is append-only — events can
607    /// be updated, though (e.g. if it was encrypted before, and
608    /// successfully decrypted later).
609    ///
610    /// This is shared between the [`EventCacheInner`] singleton and all
611    /// [`RoomEventCacheInner`] instances.
612    all_events: Arc<RwLock<AllEventsCache>>,
613
614    /// Handles to keep alive the task listening to updates.
615    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
616
617    /// A sender for notifications that a room *may* need to be auto-shrunk.
618    ///
619    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
620    /// instance.
621    ///
622    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
623    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
624}
625
626type AutoShrinkChannelPayload = OwnedRoomId;
627
628impl EventCacheInner {
629    fn client(&self) -> Result<Client> {
630        self.client.get().ok_or(EventCacheError::ClientDropped)
631    }
632
633    /// Has persistent storage been enabled for the event cache?
634    fn has_storage(&self) -> bool {
635        self.store.get().is_some()
636    }
637
638    /// Clears all the room's data.
639    async fn clear_all_rooms(&self) -> Result<()> {
640        // Note: one must NOT clear the `by_room` map, because if something subscribed
641        // to a room update, they would never get any new update for that room, since
642        // re-creating the `RoomEventCache` would create a new unrelated sender.
643
644        // Note 2: we don't need to clear the [`Self::events`] map, because events are
645        // immutable in the Matrix protocol.
646
647        let rooms = self.by_room.write().await;
648        for room in rooms.values() {
649            room.clear().await?;
650        }
651
652        Ok(())
653    }
654
655    /// Handles a single set of room updates at once.
656    #[instrument(skip(self, updates))]
657    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
658        // First, take the lock that indicates we're processing updates, to avoid
659        // handling multiple updates concurrently.
660        let _lock = self.multiple_room_updates_lock.lock().await;
661
662        // Left rooms.
663        for (room_id, left_room_update) in updates.leave {
664            let room = self.for_room(&room_id).await?;
665
666            if let Err(err) =
667                room.inner.handle_left_room_update(self.has_storage(), left_room_update).await
668            {
669                // Non-fatal error, try to continue to the next room.
670                error!("handling left room update: {err}");
671            }
672        }
673
674        // Joined rooms.
675        for (room_id, joined_room_update) in updates.join {
676            let room = self.for_room(&room_id).await?;
677
678            if let Err(err) =
679                room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
680            {
681                // Non-fatal error, try to continue to the next room.
682                error!(%room_id, "handling joined room update: {err}");
683            }
684        }
685
686        // Invited rooms.
687        // TODO: we don't anything with `updates.invite` at this point.
688
689        Ok(())
690    }
691
692    /// Return a room-specific view over the [`EventCache`].
693    ///
694    /// It may not be found, if the room isn't known to the client, in which
695    /// case it'll return None.
696    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
697        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
698        // write lock.
699        let by_room_guard = self.by_room.read().await;
700
701        match by_room_guard.get(room_id) {
702            Some(room) => Ok(room.clone()),
703
704            None => {
705                // Slow-path: the entry doesn't exist; let's acquire a write lock.
706                drop(by_room_guard);
707                let mut by_room_guard = self.by_room.write().await;
708
709                // In the meanwhile, some other caller might have obtained write access and done
710                // the same, so check for existence again.
711                if let Some(room) = by_room_guard.get(room_id) {
712                    return Ok(room.clone());
713                }
714
715                let pagination_status =
716                    SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
717
718                let room_version = self
719                    .client
720                    .get()
721                    .and_then(|client| client.get_room(room_id))
722                    .as_ref()
723                    .map(|room| room.clone_info().room_version_or_default())
724                    .unwrap_or_else(|| {
725                        warn!("unknown room version for {room_id}, using default V1");
726                        RoomVersionId::V1
727                    });
728
729                let room_state = RoomEventCacheState::new(
730                    room_id.to_owned(),
731                    room_version,
732                    self.store.clone(),
733                    pagination_status.clone(),
734                )
735                .await?;
736
737                // SAFETY: we must have subscribed before reaching this coed, otherwise
738                // something is very wrong.
739                let auto_shrink_sender =
740                    self.auto_shrink_sender.get().cloned().expect(
741                        "we must have called `EventCache::subscribe()` before calling here.",
742                    );
743
744                let room_event_cache = RoomEventCache::new(
745                    self.client.clone(),
746                    room_state,
747                    pagination_status,
748                    room_id.to_owned(),
749                    self.all_events.clone(),
750                    auto_shrink_sender,
751                );
752
753                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
754
755                Ok(room_event_cache)
756            }
757        }
758    }
759}
760
761/// The result of a single back-pagination request.
762#[derive(Debug)]
763pub struct BackPaginationOutcome {
764    /// Did the back-pagination reach the start of the timeline?
765    pub reached_start: bool,
766
767    /// All the events that have been returned in the back-pagination
768    /// request.
769    ///
770    /// Events are presented in reverse order: the first element of the vec,
771    /// if present, is the most "recent" event from the chunk (or
772    /// technically, the last one in the topological ordering).
773    pub events: Vec<TimelineEvent>,
774}
775
776/// An update related to events happened in a room.
777#[derive(Debug, Clone)]
778pub enum RoomEventCacheUpdate {
779    /// The fully read marker has moved to a different event.
780    MoveReadMarkerTo {
781        /// Event at which the read marker is now pointing.
782        event_id: OwnedEventId,
783    },
784
785    /// The members have changed.
786    UpdateMembers {
787        /// Collection of ambiguity changes that room member events trigger.
788        ///
789        /// This is a map of event ID of the `m.room.member` event to the
790        /// details of the ambiguity change.
791        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
792    },
793
794    /// The room has received updates for the timeline as _diffs_.
795    UpdateTimelineEvents {
796        /// Diffs to apply to the timeline.
797        diffs: Vec<VectorDiff<TimelineEvent>>,
798
799        /// Where the diffs are coming from.
800        origin: EventsOrigin,
801    },
802
803    /// The room has received new ephemeral events.
804    AddEphemeralEvents {
805        /// XXX: this is temporary, until read receipts are handled in the event
806        /// cache
807        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
808    },
809}
810
811/// Indicate where events are coming from.
812#[derive(Debug, Clone)]
813pub enum EventsOrigin {
814    /// Events are coming from a sync.
815    Sync,
816
817    /// Events are coming from pagination.
818    Pagination,
819
820    /// The cause of the change is purely internal to the cache.
821    Cache,
822}
823
824#[cfg(test)]
825mod tests {
826    use assert_matches::assert_matches;
827    use futures_util::FutureExt as _;
828    use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
829    use matrix_sdk_test::{async_test, event_factory::EventFactory};
830    use ruma::{event_id, room_id, serde::Raw, user_id};
831    use serde_json::json;
832
833    use super::{EventCacheError, RoomEventCacheUpdate};
834    use crate::test_utils::{assert_event_matches_msg, logged_in_client};
835
836    #[async_test]
837    async fn test_must_explicitly_subscribe() {
838        let client = logged_in_client(None).await;
839
840        let event_cache = client.event_cache();
841
842        // If I create a room event subscriber for a room before subscribing the event
843        // cache,
844        let room_id = room_id!("!omelette:fromage.fr");
845        let result = event_cache.for_room(room_id).await;
846
847        // Then it fails, because one must explicitly call `.subscribe()` on the event
848        // cache.
849        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
850    }
851
852    #[async_test]
853    async fn test_uniq_read_marker() {
854        let client = logged_in_client(None).await;
855        let room_id = room_id!("!galette:saucisse.bzh");
856        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
857
858        let event_cache = client.event_cache();
859
860        event_cache.subscribe().unwrap();
861
862        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
863
864        let (events, mut stream) = room_event_cache.subscribe().await;
865
866        assert!(events.is_empty());
867
868        // When sending multiple times the same read marker event,…
869        let read_marker_event = Raw::from_json_string(
870            json!({
871                "content": {
872                    "event_id": "$crepe:saucisse.bzh"
873                },
874                "room_id": "!galette:saucisse.bzh",
875                "type": "m.fully_read"
876            })
877            .to_string(),
878        )
879        .unwrap();
880        let account_data = vec![read_marker_event; 100];
881
882        room_event_cache
883            .inner
884            .handle_joined_room_update(
885                event_cache.inner.has_storage(),
886                JoinedRoomUpdate { account_data, ..Default::default() },
887            )
888            .await
889            .unwrap();
890
891        // … there's only one read marker update.
892        assert_matches!(
893            stream.recv().await.unwrap(),
894            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
895        );
896
897        assert!(stream.recv().now_or_never().is_none());
898    }
899
900    #[async_test]
901    async fn test_get_event_by_id() {
902        let client = logged_in_client(None).await;
903        let room_id1 = room_id!("!galette:saucisse.bzh");
904        let room_id2 = room_id!("!crepe:saucisse.bzh");
905
906        let event_cache = client.event_cache();
907        event_cache.subscribe().unwrap();
908
909        // Insert two rooms with a few events.
910        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
911
912        let eid1 = event_id!("$1");
913        let eid2 = event_id!("$2");
914        let eid3 = event_id!("$3");
915
916        let joined_room_update1 = JoinedRoomUpdate {
917            timeline: Timeline {
918                events: vec![
919                    f.text_msg("hey").event_id(eid1).into(),
920                    f.text_msg("you").event_id(eid2).into(),
921                ],
922                ..Default::default()
923            },
924            ..Default::default()
925        };
926
927        let joined_room_update2 = JoinedRoomUpdate {
928            timeline: Timeline {
929                events: vec![f.text_msg("bjr").event_id(eid3).into()],
930                ..Default::default()
931            },
932            ..Default::default()
933        };
934
935        let mut updates = RoomUpdates::default();
936        updates.join.insert(room_id1.to_owned(), joined_room_update1);
937        updates.join.insert(room_id2.to_owned(), joined_room_update2);
938
939        // Have the event cache handle them.
940        event_cache.inner.handle_room_updates(updates).await.unwrap();
941
942        // Now retrieve all the events one by one.
943        let found1 = event_cache.event(eid1).await.unwrap();
944        assert_event_matches_msg(&found1, "hey");
945
946        let found2 = event_cache.event(eid2).await.unwrap();
947        assert_event_matches_msg(&found2, "you");
948
949        let found3 = event_cache.event(eid3).await.unwrap();
950        assert_event_matches_msg(&found3, "bjr");
951
952        // An unknown event won't be found.
953        assert!(event_cache.event(event_id!("$unknown")).await.is_none());
954
955        // Can also find events in a single room.
956        client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
957        let room1 = client.get_room(room_id1).unwrap();
958
959        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
960
961        let found1 = room_event_cache.event(eid1).await.unwrap();
962        assert_event_matches_msg(&found1, "hey");
963
964        let found2 = room_event_cache.event(eid2).await.unwrap();
965        assert_event_matches_msg(&found2, "you");
966
967        // Retrieving the event with id3 from the room which doesn't contain it will
968        // fail…
969        assert!(room_event_cache.event(eid3).await.is_none());
970        // …but it doesn't fail at the client-wide level.
971        assert!(event_cache.event(eid3).await.is_some());
972    }
973
974    #[async_test]
975    async fn test_save_event_and_clear() {
976        let client = logged_in_client(None).await;
977        let room_id = room_id!("!galette:saucisse.bzh");
978
979        let event_cache = client.event_cache();
980        event_cache.subscribe().unwrap();
981
982        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
983        let event_id = event_id!("$1");
984
985        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
986        let room = client.get_room(room_id).unwrap();
987
988        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
989        room_event_cache.save_event(f.text_msg("hey there").event_id(event_id).into()).await;
990
991        // Retrieving the event at the room-wide cache works.
992        assert!(room_event_cache.event(event_id).await.is_some());
993        // Also at the client level.
994        assert!(event_cache.event(event_id).await.is_some());
995
996        event_cache.empty_immutable_cache().await;
997
998        // After clearing, both fail to find the event.
999        assert!(room_event_cache.event(event_id).await.is_none());
1000        assert!(event_cache.event(event_id).await.is_none());
1001    }
1002
1003    #[async_test]
1004    async fn test_add_initial_events() {
1005        // TODO: remove this test when the event cache uses its own persistent storage.
1006        let client = logged_in_client(None).await;
1007        let room_id = room_id!("!galette:saucisse.bzh");
1008
1009        let event_cache = client.event_cache();
1010        event_cache.subscribe().unwrap();
1011
1012        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1013        event_cache
1014            .add_initial_events(room_id, vec![f.text_msg("hey").into()], None)
1015            .await
1016            .unwrap();
1017
1018        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1019        let room = client.get_room(room_id).unwrap();
1020
1021        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1022        let (initial_events, _) = room_event_cache.subscribe().await;
1023        // `add_initial_events` had an effect.
1024        assert_eq!(initial_events.len(), 1);
1025    }
1026}