Skip to main content

matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::HashMap,
32    fmt,
33    sync::{Arc, OnceLock, RwLock as StdRwLock, RwLockReadGuard, RwLockWriteGuard},
34};
35
36use futures_util::future::try_join_all;
37use matrix_sdk_base::{
38    cross_process_lock::CrossProcessLockError,
39    event_cache::store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
40    linked_chunk::lazy_loader::LazyLoaderError,
41    sync::RoomUpdates,
42    task_monitor::BackgroundTaskHandle,
43    timer,
44};
45use ruma::{OwnedRoomId, RoomId};
46use tokio::sync::{
47    Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock,
48    broadcast::{Receiver, Sender, channel},
49    mpsc,
50};
51use tracing::{error, instrument, trace};
52
53use crate::{
54    Client,
55    client::{ClientInner, WeakClient},
56    paginators::PaginatorError,
57};
58
59mod automatic_pagination;
60mod caches;
61mod deduplicator;
62mod persistence;
63#[cfg(feature = "e2e-encryption")]
64mod redecryptor;
65mod tasks;
66
67use caches::{Caches, room::RoomEventCacheLinkedChunkUpdate};
68pub use caches::{
69    TimelineVectorDiffs,
70    event_focused::EventFocusThreadMode,
71    pagination::{BackPaginationOutcome, PaginationStatus},
72    room::{
73        RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheSubscriber,
74        RoomEventCacheUpdate, pagination::RoomPagination,
75    },
76    thread::pagination::ThreadPagination,
77};
78#[cfg(feature = "e2e-encryption")]
79pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
80
81pub use crate::event_cache::automatic_pagination::AutomaticPagination;
82
83/// An error observed in the [`EventCache`].
84#[derive(thiserror::Error, Clone, Debug)]
85pub enum EventCacheError {
86    /// The [`EventCache`] instance hasn't been initialized with
87    /// [`EventCache::subscribe`]
88    #[error(
89        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
90    )]
91    NotSubscribedYet,
92
93    /// Room is not found.
94    #[error("Room `{room_id}` is not found.")]
95    RoomNotFound {
96        /// The ID of the room not being found.
97        room_id: OwnedRoomId,
98    },
99
100    /// An error has been observed while back- or forward- paginating.
101    #[error(transparent)]
102    PaginationError(Arc<crate::Error>),
103
104    /// An error has been observed while initiating an event-focused timeline.
105    #[error(transparent)]
106    InitialPaginationError(#[from] PaginatorError),
107
108    /// An error happening when interacting with storage.
109    #[error(transparent)]
110    Storage(#[from] EventCacheStoreError),
111
112    /// An error happening when attempting to (cross-process) lock storage.
113    #[error(transparent)]
114    LockingStorage(#[from] CrossProcessLockError),
115
116    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
117    /// to. It's possible this weak reference points to nothing anymore, at
118    /// times where we try to use the client.
119    #[error("The owning client of the event cache has been dropped.")]
120    ClientDropped,
121
122    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
123    /// loader.
124    ///
125    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
126    #[error(transparent)]
127    LinkedChunkLoader(#[from] LazyLoaderError),
128
129    /// An error happened when trying to load pinned events; none of them could
130    /// be loaded, which would otherwise result in an empty pinned events
131    /// list, incorrectly.
132    #[error("Unable to load any of the pinned events.")]
133    UnableToLoadPinnedEvents,
134
135    /// An error happened when reading the metadata of a linked chunk, upon
136    /// reload.
137    #[error("the linked chunk metadata is invalid: {details}")]
138    InvalidLinkedChunkMetadata {
139        /// A string containing details about the error.
140        details: String,
141    },
142}
143
144/// A result using the [`EventCacheError`].
145pub type Result<T> = std::result::Result<T, EventCacheError>;
146
147/// Hold handles to the tasks spawn by a [`EventCache`].
148pub struct EventCacheDropHandles {
149    /// Task that listens to room updates.
150    _listen_updates_task: BackgroundTaskHandle,
151
152    /// Task that listens to updates to the user's ignored list.
153    _ignore_user_list_update_task: BackgroundTaskHandle,
154
155    /// The task used to automatically shrink the linked chunks.
156    _auto_shrink_linked_chunk_task: BackgroundTaskHandle,
157
158    /// A background task listening to room and send queue updates, and
159    /// automatically subscribing the user to threads when needed, based on
160    /// the semantics of MSC4306.
161    ///
162    /// One important constraint is that there is only one such task per
163    /// [`EventCache`], so it does listen to *all* rooms at the same time.
164    _thread_subscriber_task: BackgroundTaskHandle,
165
166    /// A background task listening to room updates, and
167    /// automatically handling search index operations add/remove/edit
168    /// depending on the event type.
169    ///
170    /// One important constraint is that there is only one such task per
171    /// [`EventCache`], so it does listen to *all* rooms at the same time.
172    #[cfg(feature = "experimental-search")]
173    _search_indexing_task: BackgroundTaskHandle,
174
175    /// The task used to automatically redecrypt UTDs.
176    #[cfg(feature = "e2e-encryption")]
177    _redecryptor: redecryptor::Redecryptor,
178}
179
180impl fmt::Debug for EventCacheDropHandles {
181    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
183    }
184}
185
186/// An event cache, providing lots of useful functionality for clients.
187///
188/// Cloning is shallow, and thus is cheap to do.
189///
190/// See also the module-level comment.
191#[derive(Clone)]
192pub struct EventCache {
193    /// Reference to the inner cache.
194    inner: Arc<EventCacheInner>,
195}
196
197impl fmt::Debug for EventCache {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        f.debug_struct("EventCache").finish_non_exhaustive()
200    }
201}
202
203impl EventCache {
204    /// Create a new [`EventCache`] for the given client.
205    pub(crate) fn new(client: &Arc<ClientInner>, event_cache_store: EventCacheStoreLock) -> Self {
206        let (generic_update_sender, _) = channel(128);
207        let (linked_chunk_update_sender, _) = channel(128);
208
209        let weak_client = WeakClient::from_inner(client);
210
211        let (thread_subscriber_sender, _thread_subscriber_receiver) = channel(128);
212
213        #[cfg(feature = "e2e-encryption")]
214        let redecryption_channels = redecryptor::RedecryptorChannels::new();
215
216        Self {
217            inner: Arc::new(EventCacheInner {
218                client: weak_client,
219                config: StdRwLock::new(EventCacheConfig::default()),
220                store: event_cache_store,
221                multiple_room_updates_lock: Default::default(),
222                by_room: Default::default(),
223                drop_handles: Default::default(),
224                auto_shrink_sender: Default::default(),
225                generic_update_sender,
226                linked_chunk_update_sender,
227                #[cfg(feature = "e2e-encryption")]
228                redecryption_channels,
229                automatic_pagination: OnceLock::new(),
230                thread_subscriber_sender,
231            }),
232        }
233    }
234
235    /// Get a read-only handle to the global configuration of the
236    /// [`EventCache`].
237    pub fn config(&self) -> RwLockReadGuard<'_, EventCacheConfig> {
238        self.inner.config.read().unwrap()
239    }
240
241    /// Get a writable handle to the global configuration of the [`EventCache`].
242    pub fn config_mut(&self) -> RwLockWriteGuard<'_, EventCacheConfig> {
243        self.inner.config.write().unwrap()
244    }
245
246    /// Subscribes to updates that a thread subscription has been sent.
247    ///
248    /// For testing purposes only.
249    #[cfg(feature = "testing")]
250    pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
251        self.inner.thread_subscriber_sender.subscribe()
252    }
253
254    /// Starts subscribing the [`EventCache`] to sync responses, if not done
255    /// before.
256    ///
257    /// Re-running this has no effect if we already subscribed before, and is
258    /// cheap.
259    pub fn subscribe(&self) -> Result<()> {
260        let client = self.inner.client()?;
261
262        // Initialize the drop handles.
263        let _ = self.inner.drop_handles.get_or_init(|| {
264            let task_monitor = client.task_monitor();
265
266            // Spawn the task that will listen to all the room updates at once.
267            let listen_updates_task = task_monitor.spawn_infinite_task("event_cache::room_updates_task", tasks::room_updates_task(
268                self.inner.clone(),
269                client.subscribe_to_all_room_updates(),
270            )).abort_on_drop();
271
272            let ignore_user_list_update_task = task_monitor.spawn_infinite_task("event_cache::ignore_user_list_update_task", tasks::ignore_user_list_update_task(
273                self.inner.clone(),
274                client.subscribe_to_ignore_user_list_changes(),
275            )).abort_on_drop();
276
277            let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
278
279            // Force-initialize the sender in the [`RoomEventCacheInner`].
280            self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
281
282            let auto_shrink_linked_chunk_task = task_monitor.spawn_infinite_task("event_cache::auto_shrink_linked_chunk_task", tasks::auto_shrink_linked_chunk_task(
283                Arc::downgrade(&self.inner),
284                auto_shrink_receiver,
285            )).abort_on_drop();
286
287            #[cfg(feature = "e2e-encryption")]
288            let redecryptor = {
289                let receiver = self
290                    .inner
291                    .redecryption_channels
292                    .decryption_request_receiver
293                    .lock()
294                    .take()
295                    .expect("We should have initialized the channel an subscribing should happen only once");
296
297                redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
298            };
299
300        let thread_subscriber_task = client
301            .task_monitor()
302            .spawn_infinite_task(
303                "event_cache::thread_subscriber",
304                tasks::thread_subscriber_task(
305                    self.inner.client.clone(),
306                    self.inner.linked_chunk_update_sender.clone(),
307                    self.inner.thread_subscriber_sender.clone(),
308                ),
309            )
310            .abort_on_drop();
311
312        #[cfg(feature = "experimental-search")]
313        let search_indexing_task = client
314            .task_monitor()
315            .spawn_infinite_task(
316                "event_cache::search_indexing",
317                tasks::search_indexing_task(
318                    self.inner.client.clone(),
319                    self.inner.linked_chunk_update_sender.clone(),
320                ),
321            )
322            .abort_on_drop();
323
324            if self.config().experimental_auto_backpagination {
325                // Run the deferred initialization of the automatic pagination request sender, that
326                // is shared with every room.
327                trace!("spawning the automatic paginations API");
328                self.inner.automatic_pagination.get_or_init(|| AutomaticPagination::new(Arc::downgrade(&self.inner), task_monitor));
329            } else {
330                trace!("automatic paginations API is disabled");
331            }
332
333            Arc::new(EventCacheDropHandles {
334                _listen_updates_task: listen_updates_task,
335                _ignore_user_list_update_task: ignore_user_list_update_task,
336                _auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_task,
337                #[cfg(feature = "e2e-encryption")]
338                _redecryptor: redecryptor,
339                _thread_subscriber_task: thread_subscriber_task,
340                #[cfg(feature = "experimental-search")]
341                _search_indexing_task: search_indexing_task,
342            })
343        });
344
345        Ok(())
346    }
347
348    /// For benchmarking purposes only.
349    #[doc(hidden)]
350    pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
351        self.inner.handle_room_updates(updates).await
352    }
353
354    /// Check whether [`EventCache::subscribe`] has been called.
355    pub fn has_subscribed(&self) -> bool {
356        self.inner.drop_handles.get().is_some()
357    }
358
359    /// Return a room-specific view over the [`EventCache`].
360    pub(crate) async fn for_room(
361        &self,
362        room_id: &RoomId,
363    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
364        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
365            return Err(EventCacheError::NotSubscribedYet);
366        };
367
368        let room = self.inner.all_caches_for_room(room_id).await?.room.clone();
369
370        Ok((room, drop_handles))
371    }
372
373    /// Cleanly clear all the rooms' event caches.
374    ///
375    /// This will notify any live observers that the room has been cleared.
376    pub async fn clear_all_rooms(&self) -> Result<()> {
377        self.inner.clear_all_rooms().await
378    }
379
380    /// Subscribe to room _generic_ updates.
381    ///
382    /// If one wants to listen what has changed in a specific room, the
383    /// [`RoomEventCache::subscribe`] is recommended. However, the
384    /// [`RoomEventCacheSubscriber`] type triggers side-effects.
385    ///
386    /// If one wants to get a high-overview, generic, updates for rooms, and
387    /// without side-effects, this method is recommended. Also, dropping the
388    /// receiver of this channel will not trigger any side-effect.
389    pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
390        self.inner.generic_update_sender.subscribe()
391    }
392
393    /// Returns a reference to the [`AutomaticPagination`] API, if enabled at
394    /// construction with the
395    /// [`EventCacheConfig::experimental_auto_backpagination`] flag.
396    pub fn automatic_pagination(&self) -> Option<AutomaticPagination> {
397        self.inner.automatic_pagination.get().cloned()
398    }
399}
400
401/// Global configuration for the [`EventCache`], applied to every single room.
402#[derive(Clone, Copy, Debug)]
403pub struct EventCacheConfig {
404    /// Maximum number of concurrent /event requests when loading pinned events.
405    pub max_pinned_events_concurrent_requests: usize,
406
407    /// Maximum number of pinned events to load, for any room.
408    pub max_pinned_events_to_load: usize,
409
410    /// Whether to automatically backpaginate a room under certain conditions.
411    ///
412    /// Off by default.
413    pub experimental_auto_backpagination: bool,
414
415    /// The maximum number of allowed room paginations, for a given room, that
416    /// can be executed in the automatic paginations task.
417    ///
418    /// After that number of paginations, the task will stop executing
419    /// paginations for that room *in the background* (user-requested
420    /// paginations will still be executed, of course).
421    ///
422    /// Defaults to [`EventCacheConfig::DEFAULT_ROOM_PAGINATION_CREDITS`].
423    pub room_pagination_per_room_credit: usize,
424
425    /// The number of messages to paginate in a single batch, when executing an
426    /// automatic pagination request.
427    ///
428    /// Defaults to [`EventCacheConfig::DEFAULT_ROOM_PAGINATION_BATCH_SIZE`].
429    pub room_pagination_batch_size: u16,
430}
431
432impl EventCacheConfig {
433    /// The default maximum number of pinned events to load.
434    pub const DEFAULT_MAX_EVENTS_TO_LOAD: usize = 128;
435
436    /// The default maximum number of concurrent requests to perform when
437    /// loading the pinned events.
438    pub const DEFAULT_MAX_CONCURRENT_REQUESTS: usize = 8;
439
440    /// The default number of credits to give to a room for automatic
441    /// paginations (see also
442    /// [`EventCacheConfig::room_pagination_per_room_credit`]).
443    pub const DEFAULT_ROOM_PAGINATION_CREDITS: usize = 20;
444
445    /// The default number of messages to paginate in a single batch, when
446    /// executing an automatic pagination request (see also
447    /// [`EventCacheConfig::room_pagination_batch_size`]).
448    pub const DEFAULT_ROOM_PAGINATION_BATCH_SIZE: u16 = 30;
449}
450
451impl Default for EventCacheConfig {
452    fn default() -> Self {
453        Self {
454            max_pinned_events_concurrent_requests: Self::DEFAULT_MAX_CONCURRENT_REQUESTS,
455            max_pinned_events_to_load: Self::DEFAULT_MAX_EVENTS_TO_LOAD,
456            room_pagination_per_room_credit: Self::DEFAULT_ROOM_PAGINATION_CREDITS,
457            room_pagination_batch_size: Self::DEFAULT_ROOM_PAGINATION_BATCH_SIZE,
458            experimental_auto_backpagination: false,
459        }
460    }
461}
462
463type CachesByRoom = HashMap<OwnedRoomId, Caches>;
464
465struct EventCacheInner {
466    /// A weak reference to the inner client, useful when trying to get a handle
467    /// on the owning client.
468    client: WeakClient,
469
470    /// Global configuration for the event cache.
471    config: StdRwLock<EventCacheConfig>,
472
473    /// Reference to the underlying store.
474    store: EventCacheStoreLock,
475
476    /// A lock used when many rooms must be updated at once.
477    ///
478    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
479    /// ensure that multiple updates will be applied in the correct order, which
480    /// is enforced by taking this lock when handling an update.
481    // TODO: that's the place to add a cross-process lock!
482    multiple_room_updates_lock: Mutex<()>,
483
484    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
485    //
486    // It's behind an `Arc` to get owned locks.
487    by_room: Arc<RwLock<CachesByRoom>>,
488
489    /// Handles to keep alive the task listening to updates.
490    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
491
492    /// A sender for notifications that a room *may* need to be auto-shrunk.
493    ///
494    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
495    /// instance.
496    ///
497    /// It's a `OnceLock` because its initialization is deferred to
498    /// [`EventCache::subscribe`].
499    ///
500    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
501    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
502
503    /// A sender for room generic update.
504    ///
505    /// See doc comment of [`RoomEventCacheGenericUpdate`] and
506    /// [`EventCache::subscribe_to_room_generic_updates`].
507    generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
508
509    /// A sender for a persisted linked chunk update.
510    ///
511    /// This is used to notify that some linked chunk has persisted some updates
512    /// to a store, during sync or a back-pagination of *any* linked chunk.
513    /// This can be used by observers to look for new events.
514    ///
515    /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
516    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
517
518    /// A test helper receiver that will be emitted every time the thread
519    /// subscriber task subscribed to a new thread.
520    ///
521    /// This is helpful for tests to coordinate that a new thread subscription
522    /// has been sent or not.
523    thread_subscriber_sender: Sender<()>,
524
525    #[cfg(feature = "e2e-encryption")]
526    redecryption_channels: redecryptor::RedecryptorChannels,
527
528    /// State for the automatic pagination mechanism.
529    ///
530    /// Depends on the [`EventCacheConfig::experimental_auto_backpagination`]
531    /// flag to be set at subscription time.
532    automatic_pagination: OnceLock<AutomaticPagination>,
533}
534
535type AutoShrinkChannelPayload = OwnedRoomId;
536
537impl EventCacheInner {
538    fn client(&self) -> Result<Client> {
539        self.client.get().ok_or(EventCacheError::ClientDropped)
540    }
541
542    /// Clears all the room's data.
543    async fn clear_all_rooms(&self) -> Result<()> {
544        // Okay, here's where things get complicated.
545        //
546        // On the one hand, `by_room` may include storage for *some* rooms that we know
547        // about, but not *all* of them. Any room that hasn't been loaded in the
548        // client, or touched by a sync, will remain unloaded in memory, so it
549        // will be missing from `self.by_room`. As a result, we need to make
550        // sure that we're hitting the storage backend to *really* clear all the
551        // rooms, including those that haven't been loaded yet.
552        //
553        // On the other hand, one must NOT clear the `by_room` map, because if someone
554        // subscribed to a room update, they would never get any new update for
555        // that room, since re-creating the `RoomEventCache` would create a new,
556        // unrelated sender.
557        //
558        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
559        // store backend.
560        //
561        // As a result, for a short while, the in-memory linked chunks
562        // will be desynchronized from the storage. We need to be careful then. During
563        // that short while, we don't want *anyone* to touch the linked chunk
564        // (be it in memory or in the storage).
565        //
566        // And since that requirement applies to *any* room in `by_room` at the same
567        // time, we'll have to take the locks for *all* the live rooms, so as to
568        // properly clear the underlying storage.
569        //
570        // At this point, you might be scared about the potential for deadlocking. I am
571        // as well, but I'm convinced we're fine:
572        //
573        // 1. the lock for `by_room` is usually held only for a short while, and
574        //    independently of the other two kinds.
575        // 2. the state may acquire the store cross-process lock internally, but only
576        //    while the state's methods are called (so it's always transient). As a
577        //    result, as soon as we've acquired the state locks, the store lock ought to
578        //    be free.
579        // 3. The store lock is held explicitly only in a small scoped area below.
580        // 4. Then the store lock will be held internally when calling `reset_all()`,
581        //    but at this point it's only held for a short while each time, so rooms
582        //    will take turn to acquire it.
583
584        let mut all_caches = self.by_room.write().await;
585
586        // Prepare to reset all the caches: it ensures nobody is accessing or mutating
587        // them.
588        let resets =
589            try_join_all(all_caches.values_mut().map(|caches| caches.prepare_to_reset())).await?;
590
591        // Clear the storage for all the rooms, using the storage facility.
592        let store_guard = match self.store.lock().await? {
593            EventCacheStoreLockState::Clean(store_guard) => store_guard,
594            EventCacheStoreLockState::Dirty(store_guard) => store_guard,
595        };
596        store_guard.clear_all_linked_chunks().await?;
597
598        // At this point, all the in-memory linked chunks are desynchronized from their
599        // storages. Resynchronize them manually by resetting them.
600        try_join_all(resets.into_iter().map(|reset_cache| reset_cache.reset_all())).await?;
601
602        Ok(())
603    }
604
605    /// Handles a single set of room updates at once.
606    #[instrument(skip(self, updates))]
607    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
608        // First, take the lock that indicates we're processing updates, to avoid
609        // handling multiple updates concurrently.
610        let _lock = {
611            let _timer = timer!("Taking the `multiple_room_updates_lock`");
612            self.multiple_room_updates_lock.lock().await
613        };
614
615        // NOTE: We tried to make this concurrent at some point, but it turned out to be
616        // a performance regression, even for large sync updates. Lacking time
617        // to investigate, this code remains sequential for now. See also
618        // https://github.com/matrix-org/matrix-rust-sdk/pull/5426.
619
620        // Left rooms.
621        for (room_id, left_room_update) in updates.left {
622            let Ok(caches) = self.all_caches_for_room(&room_id).await else {
623                error!(?room_id, "Room must exist");
624                continue;
625            };
626
627            if let Err(err) = caches.handle_left_room_update(left_room_update).await {
628                // Non-fatal error, try to continue to the next room.
629                error!("handling left room update: {err}");
630            }
631        }
632
633        // Joined rooms.
634        for (room_id, joined_room_update) in updates.joined {
635            trace!(?room_id, "Handling a `JoinedRoomUpdate`");
636
637            let Ok(caches) = self.all_caches_for_room(&room_id).await else {
638                error!(?room_id, "Room must exist");
639                continue;
640            };
641
642            if let Err(err) = caches.handle_joined_room_update(joined_room_update).await {
643                // Non-fatal error, try to continue to the next room.
644                error!(%room_id, "handling joined room update: {err}");
645            }
646        }
647
648        // Invited rooms.
649        // TODO: we don't anything with `updates.invite` at this point.
650
651        Ok(())
652    }
653
654    /// Return all the event caches associated to a specific room.
655    async fn all_caches_for_room(
656        &self,
657        room_id: &RoomId,
658    ) -> Result<OwnedRwLockReadGuard<CachesByRoom, Caches>> {
659        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
660        // write lock.
661        match OwnedRwLockReadGuard::try_map(self.by_room.clone().read_owned().await, |by_room| {
662            by_room.get(room_id)
663        }) {
664            Ok(caches) => Ok(caches),
665
666            Err(by_room_guard) => {
667                // Slow-path: the entry doesn't exist; let's acquire a write lock.
668                drop(by_room_guard);
669                let by_room_guard = self.by_room.clone().write_owned().await;
670
671                // In the meanwhile, some other caller might have obtained write access and done
672                // the same, so check for existence again.
673                let mut by_room_guard =
674                    match OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
675                        by_room.get(room_id)
676                    }) {
677                        Ok(caches) => return Ok(caches),
678                        Err(by_room_guard) => by_room_guard,
679                    };
680
681                let caches = Caches::new(
682                    &self.client,
683                    room_id,
684                    self.generic_update_sender.clone(),
685                    self.linked_chunk_update_sender.clone(),
686                    // SAFETY: we must have subscribed before reaching this code, otherwise
687                    // something is very wrong.
688                    self.auto_shrink_sender.get().cloned().expect(
689                        "we must have called `EventCache::subscribe()` before calling here.",
690                    ),
691                    self.store.clone(),
692                    self.automatic_pagination.get().cloned(),
693                )
694                .await?;
695
696                by_room_guard.insert(room_id.to_owned(), caches);
697
698                Ok(OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
699                    by_room.get(room_id)
700                })
701                .expect("`Caches` has just been inserted"))
702            }
703        }
704    }
705}
706
707/// Indicate where events are coming from.
708#[derive(Debug, Clone)]
709pub enum EventsOrigin {
710    /// Events are coming from a sync.
711    Sync,
712
713    /// Events are coming from pagination.
714    Pagination,
715
716    /// The cause of the change is purely internal to the cache.
717    Cache,
718}
719
720#[cfg(test)]
721mod tests {
722    use std::{ops::Not, sync::Arc, time::Duration};
723
724    use assert_matches::assert_matches;
725    use futures_util::FutureExt as _;
726    use matrix_sdk_base::{
727        RoomState,
728        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
729        sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
730    };
731    use matrix_sdk_test::{
732        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
733    };
734    use ruma::{event_id, room_id, user_id};
735    use tokio::time::sleep;
736
737    use super::{EventCacheError, RoomEventCacheGenericUpdate};
738    use crate::test_utils::{
739        assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
740    };
741
742    #[async_test]
743    async fn test_must_explicitly_subscribe() {
744        let client = logged_in_client(None).await;
745
746        let event_cache = client.event_cache();
747
748        // If I create a room event subscriber for a room before subscribing the event
749        // cache,
750        let room_id = room_id!("!omelette:fromage.fr");
751        let result = event_cache.for_room(room_id).await;
752
753        // Then it fails, because one must explicitly call `.subscribe()` on the event
754        // cache.
755        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
756    }
757
758    #[async_test]
759    async fn test_get_event_by_id() {
760        let client = logged_in_client(None).await;
761        let room_id1 = room_id!("!galette:saucisse.bzh");
762        let room_id2 = room_id!("!crepe:saucisse.bzh");
763
764        client.base_client().get_or_create_room(room_id1, RoomState::Joined);
765        client.base_client().get_or_create_room(room_id2, RoomState::Joined);
766
767        let event_cache = client.event_cache();
768        event_cache.subscribe().unwrap();
769
770        // Insert two rooms with a few events.
771        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
772
773        let eid1 = event_id!("$1");
774        let eid2 = event_id!("$2");
775        let eid3 = event_id!("$3");
776
777        let joined_room_update1 = JoinedRoomUpdate {
778            timeline: Timeline {
779                events: vec![
780                    f.text_msg("hey").event_id(eid1).into(),
781                    f.text_msg("you").event_id(eid2).into(),
782                ],
783                ..Default::default()
784            },
785            ..Default::default()
786        };
787
788        let joined_room_update2 = JoinedRoomUpdate {
789            timeline: Timeline {
790                events: vec![f.text_msg("bjr").event_id(eid3).into()],
791                ..Default::default()
792            },
793            ..Default::default()
794        };
795
796        let mut updates = RoomUpdates::default();
797        updates.joined.insert(room_id1.to_owned(), joined_room_update1);
798        updates.joined.insert(room_id2.to_owned(), joined_room_update2);
799
800        // Have the event cache handle them.
801        event_cache.inner.handle_room_updates(updates).await.unwrap();
802
803        // We can find the events in a single room.
804        let room1 = client.get_room(room_id1).unwrap();
805
806        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
807
808        let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
809        assert_event_matches_msg(&found1, "hey");
810
811        let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
812        assert_event_matches_msg(&found2, "you");
813
814        // Retrieving the event with id3 from the room which doesn't contain it will
815        // fail…
816        assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
817    }
818
819    #[async_test]
820    async fn test_save_event() {
821        let client = logged_in_client(None).await;
822        let room_id = room_id!("!galette:saucisse.bzh");
823
824        let event_cache = client.event_cache();
825        event_cache.subscribe().unwrap();
826
827        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
828        let event_id = event_id!("$1");
829
830        client.base_client().get_or_create_room(room_id, RoomState::Joined);
831        let room = client.get_room(room_id).unwrap();
832
833        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
834        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
835
836        // Retrieving the event at the room-wide cache works.
837        assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
838    }
839
840    #[async_test]
841    async fn test_generic_update_when_loading_rooms() {
842        // Create 2 rooms. One of them has data in the event cache storage.
843        let user = user_id!("@mnt_io:matrix.org");
844        let client = logged_in_client(None).await;
845        let room_id_0 = room_id!("!raclette:patate.ch");
846        let room_id_1 = room_id!("!fondue:patate.ch");
847
848        let event_factory = EventFactory::new().room(room_id_0).sender(user);
849
850        let event_cache = client.event_cache();
851        event_cache.subscribe().unwrap();
852
853        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
854        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
855
856        client
857            .event_cache_store()
858            .lock()
859            .await
860            .expect("Could not acquire the event cache lock")
861            .as_clean()
862            .expect("Could not acquire a clean event cache lock")
863            .handle_linked_chunk_updates(
864                LinkedChunkId::Room(room_id_0),
865                vec![
866                    // Non-empty items chunk.
867                    Update::NewItemsChunk {
868                        previous: None,
869                        new: ChunkIdentifier::new(0),
870                        next: None,
871                    },
872                    Update::PushItems {
873                        at: Position::new(ChunkIdentifier::new(0), 0),
874                        items: vec![
875                            event_factory
876                                .text_msg("hello")
877                                .sender(user)
878                                .event_id(event_id!("$ev0"))
879                                .into_event(),
880                        ],
881                    },
882                ],
883            )
884            .await
885            .unwrap();
886
887        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
888
889        // Room 0 has initial data, so it must trigger a generic update.
890        {
891            let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
892
893            assert_matches!(
894                generic_stream.recv().await,
895                Ok(RoomEventCacheGenericUpdate { room_id }) => {
896                    assert_eq!(room_id, room_id_0);
897                }
898            );
899        }
900
901        // Room 1 has NO initial data, so nothing should happen.
902        {
903            let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
904
905            assert!(generic_stream.recv().now_or_never().is_none());
906        }
907    }
908
909    #[async_test]
910    async fn test_generic_update_when_paginating_room() {
911        // Create 1 room, with 4 chunks in the event cache storage.
912        let user = user_id!("@mnt_io:matrix.org");
913        let client = logged_in_client(None).await;
914        let room_id = room_id!("!raclette:patate.ch");
915
916        let event_factory = EventFactory::new().room(room_id).sender(user);
917
918        let event_cache = client.event_cache();
919        event_cache.subscribe().unwrap();
920
921        client.base_client().get_or_create_room(room_id, RoomState::Joined);
922
923        client
924            .event_cache_store()
925            .lock()
926            .await
927            .expect("Could not acquire the event cache lock")
928            .as_clean()
929            .expect("Could not acquire a clean event cache lock")
930            .handle_linked_chunk_updates(
931                LinkedChunkId::Room(room_id),
932                vec![
933                    // Empty chunk.
934                    Update::NewItemsChunk {
935                        previous: None,
936                        new: ChunkIdentifier::new(0),
937                        next: None,
938                    },
939                    // Empty chunk.
940                    Update::NewItemsChunk {
941                        previous: Some(ChunkIdentifier::new(0)),
942                        new: ChunkIdentifier::new(1),
943                        next: None,
944                    },
945                    // Non-empty items chunk.
946                    Update::NewItemsChunk {
947                        previous: Some(ChunkIdentifier::new(1)),
948                        new: ChunkIdentifier::new(2),
949                        next: None,
950                    },
951                    Update::PushItems {
952                        at: Position::new(ChunkIdentifier::new(2), 0),
953                        items: vec![
954                            event_factory
955                                .text_msg("hello")
956                                .sender(user)
957                                .event_id(event_id!("$ev0"))
958                                .into_event(),
959                        ],
960                    },
961                    // Non-empty items chunk.
962                    Update::NewItemsChunk {
963                        previous: Some(ChunkIdentifier::new(2)),
964                        new: ChunkIdentifier::new(3),
965                        next: None,
966                    },
967                    Update::PushItems {
968                        at: Position::new(ChunkIdentifier::new(3), 0),
969                        items: vec![
970                            event_factory
971                                .text_msg("world")
972                                .sender(user)
973                                .event_id(event_id!("$ev1"))
974                                .into_event(),
975                        ],
976                    },
977                ],
978            )
979            .await
980            .unwrap();
981
982        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
983
984        // Room is initialised, it gets one event in the timeline.
985        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
986
987        assert_matches!(
988            generic_stream.recv().await,
989            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
990                assert_eq!(room_id, expected_room_id);
991            }
992        );
993
994        let pagination = room_event_cache.pagination();
995
996        // Paginate, it gets one new event in the timeline.
997        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
998
999        assert_eq!(pagination_outcome.events.len(), 1);
1000        assert!(pagination_outcome.reached_start.not());
1001        assert_matches!(
1002            generic_stream.recv().await,
1003            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1004                assert_eq!(room_id, expected_room_id);
1005            }
1006        );
1007
1008        // Paginate, it gets zero new event in the timeline.
1009        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1010
1011        assert!(pagination_outcome.events.is_empty());
1012        assert!(pagination_outcome.reached_start.not());
1013        assert!(generic_stream.recv().now_or_never().is_none());
1014
1015        // Paginate once more. Just checking our scenario is correct.
1016        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1017
1018        assert!(pagination_outcome.reached_start);
1019        assert!(generic_stream.recv().now_or_never().is_none());
1020    }
1021
1022    #[async_test]
1023    async fn test_for_room_when_room_is_not_found() {
1024        let client = logged_in_client(None).await;
1025        let room_id = room_id!("!raclette:patate.ch");
1026
1027        let event_cache = client.event_cache();
1028        event_cache.subscribe().unwrap();
1029
1030        // Room doesn't exist. It returns an error.
1031        assert_matches!(
1032            event_cache.for_room(room_id).await,
1033            Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1034                assert_eq!(room_id, not_found_room_id);
1035            }
1036        );
1037
1038        // Now create the room.
1039        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1040
1041        // Room exists. Everything fine.
1042        assert!(event_cache.for_room(room_id).await.is_ok());
1043    }
1044
1045    /// Test that the event cache does not create reference cycles or tasks that
1046    /// retain its reference indefinitely, preventing it from being deallocated.
1047    #[cfg(not(target_family = "wasm"))]
1048    #[async_test]
1049    async fn test_no_refcycle_event_cache_tasks() {
1050        let client = MockClientBuilder::new(None).build().await;
1051
1052        // Wait for the init tasks to die.
1053        sleep(Duration::from_secs(1)).await;
1054
1055        let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1056        assert_eq!(event_cache_weak.strong_count(), 1);
1057
1058        {
1059            let room_id = room_id!("!room:example.org");
1060
1061            // Have the client know the room.
1062            let response = SyncResponseBuilder::default()
1063                .add_joined_room(JoinedRoomBuilder::new(room_id))
1064                .build_sync_response();
1065            client.inner.base_client.receive_sync_response(response).await.unwrap();
1066
1067            client.event_cache().subscribe().unwrap();
1068
1069            let (_room_event_cache, _drop_handles) =
1070                client.get_room(room_id).unwrap().event_cache().await.unwrap();
1071        }
1072
1073        drop(client);
1074
1075        // Give a bit of time for background tasks to die.
1076        sleep(Duration::from_secs(1)).await;
1077
1078        // No strong counts should exist now that the Client has been dropped.
1079        assert_eq!(
1080            event_cache_weak.strong_count(),
1081            0,
1082            "Too many strong references to the event cache {}",
1083            event_cache_weak.strong_count()
1084        );
1085    }
1086}