Skip to main content

matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::HashMap,
32    fmt,
33    ops::{Deref, DerefMut},
34    sync::{Arc, OnceLock},
35};
36
37use futures_util::future::try_join_all;
38use matrix_sdk_base::{
39    cross_process_lock::CrossProcessLockError,
40    event_cache::store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
41    linked_chunk::lazy_loader::LazyLoaderError,
42    sync::RoomUpdates,
43    task_monitor::BackgroundTaskHandle,
44    timer,
45};
46use ruma::{OwnedRoomId, RoomId};
47use tokio::sync::{
48    Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock,
49    broadcast::{Receiver, Sender, channel},
50    mpsc,
51};
52use tracing::{error, instrument, trace};
53
54use crate::{
55    Client,
56    client::{ClientInner, WeakClient},
57    paginators::PaginatorError,
58};
59
60mod caches;
61mod deduplicator;
62mod persistence;
63#[cfg(feature = "e2e-encryption")]
64mod redecryptor;
65mod tasks;
66
67use caches::{Caches, room::RoomEventCacheLinkedChunkUpdate};
68pub use caches::{
69    TimelineVectorDiffs,
70    event_focused::EventFocusThreadMode,
71    pagination::{BackPaginationOutcome, PaginationStatus},
72    room::{
73        RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheSubscriber,
74        RoomEventCacheUpdate, pagination::RoomPagination,
75    },
76    thread::pagination::ThreadPagination,
77};
78#[cfg(feature = "e2e-encryption")]
79pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
80
81/// An error observed in the [`EventCache`].
82#[derive(thiserror::Error, Clone, Debug)]
83pub enum EventCacheError {
84    /// The [`EventCache`] instance hasn't been initialized with
85    /// [`EventCache::subscribe`]
86    #[error(
87        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
88    )]
89    NotSubscribedYet,
90
91    /// Room is not found.
92    #[error("Room `{room_id}` is not found.")]
93    RoomNotFound {
94        /// The ID of the room not being found.
95        room_id: OwnedRoomId,
96    },
97
98    /// An error has been observed while back- or forward- paginating.
99    #[error(transparent)]
100    PaginationError(Arc<crate::Error>),
101
102    /// An error has been observed while initiating an event-focused timeline.
103    #[error(transparent)]
104    InitialPaginationError(#[from] PaginatorError),
105
106    /// An error happening when interacting with storage.
107    #[error(transparent)]
108    Storage(#[from] EventCacheStoreError),
109
110    /// An error happening when attempting to (cross-process) lock storage.
111    #[error(transparent)]
112    LockingStorage(#[from] CrossProcessLockError),
113
114    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
115    /// to. It's possible this weak reference points to nothing anymore, at
116    /// times where we try to use the client.
117    #[error("The owning client of the event cache has been dropped.")]
118    ClientDropped,
119
120    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
121    /// loader.
122    ///
123    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
124    #[error(transparent)]
125    LinkedChunkLoader(#[from] LazyLoaderError),
126
127    /// An error happened when trying to load pinned events; none of them could
128    /// be loaded, which would otherwise result in an empty pinned events
129    /// list, incorrectly.
130    #[error("Unable to load any of the pinned events.")]
131    UnableToLoadPinnedEvents,
132
133    /// An error happened when reading the metadata of a linked chunk, upon
134    /// reload.
135    #[error("the linked chunk metadata is invalid: {details}")]
136    InvalidLinkedChunkMetadata {
137        /// A string containing details about the error.
138        details: String,
139    },
140}
141
142/// A result using the [`EventCacheError`].
143pub type Result<T> = std::result::Result<T, EventCacheError>;
144
145/// Hold handles to the tasks spawn by a [`EventCache`].
146pub struct EventCacheDropHandles {
147    /// Task that listens to room updates.
148    listen_updates_task: BackgroundTaskHandle,
149
150    /// Task that listens to updates to the user's ignored list.
151    ignore_user_list_update_task: BackgroundTaskHandle,
152
153    /// The task used to automatically shrink the linked chunks.
154    auto_shrink_linked_chunk_task: BackgroundTaskHandle,
155
156    /// The task used to automatically redecrypt UTDs.
157    #[cfg(feature = "e2e-encryption")]
158    _redecryptor: redecryptor::Redecryptor,
159}
160
161impl fmt::Debug for EventCacheDropHandles {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
164    }
165}
166
167impl Drop for EventCacheDropHandles {
168    fn drop(&mut self) {
169        self.listen_updates_task.abort();
170        self.ignore_user_list_update_task.abort();
171        self.auto_shrink_linked_chunk_task.abort();
172    }
173}
174
175/// An event cache, providing lots of useful functionality for clients.
176///
177/// Cloning is shallow, and thus is cheap to do.
178///
179/// See also the module-level comment.
180#[derive(Clone)]
181pub struct EventCache {
182    /// Reference to the inner cache.
183    inner: Arc<EventCacheInner>,
184}
185
186impl fmt::Debug for EventCache {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        f.debug_struct("EventCache").finish_non_exhaustive()
189    }
190}
191
192impl EventCache {
193    /// Create a new [`EventCache`] for the given client.
194    pub(crate) fn new(client: &Arc<ClientInner>, event_cache_store: EventCacheStoreLock) -> Self {
195        let (generic_update_sender, _) = channel(128);
196        let (linked_chunk_update_sender, _) = channel(128);
197
198        let weak_client = WeakClient::from_inner(client);
199
200        let (thread_subscriber_sender, _thread_subscriber_receiver) = channel(128);
201        let thread_subscriber_task = client
202            .task_monitor
203            .spawn_background_task(
204                "event_cache::thread_subscriber",
205                tasks::thread_subscriber_task(
206                    weak_client.clone(),
207                    linked_chunk_update_sender.clone(),
208                    thread_subscriber_sender,
209                ),
210            )
211            .abort_on_drop();
212
213        #[cfg(feature = "experimental-search")]
214        let search_indexing_task = client
215            .task_monitor
216            .spawn_background_task(
217                "event_cache::search_indexing",
218                tasks::search_indexing_task(
219                    weak_client.clone(),
220                    linked_chunk_update_sender.clone(),
221                ),
222            )
223            .abort_on_drop();
224
225        #[cfg(feature = "e2e-encryption")]
226        let redecryption_channels = redecryptor::RedecryptorChannels::new();
227
228        Self {
229            inner: Arc::new(EventCacheInner {
230                client: weak_client,
231                config: RwLock::new(EventCacheConfig::default()),
232                store: event_cache_store,
233                multiple_room_updates_lock: Default::default(),
234                by_room: Default::default(),
235                drop_handles: Default::default(),
236                auto_shrink_sender: Default::default(),
237                generic_update_sender,
238                linked_chunk_update_sender,
239                _thread_subscriber_task: thread_subscriber_task,
240                #[cfg(feature = "experimental-search")]
241                _search_indexing_task: search_indexing_task,
242                #[cfg(feature = "e2e-encryption")]
243                redecryption_channels,
244                #[cfg(feature = "testing")]
245                thread_subscriber_receiver: _thread_subscriber_receiver,
246            }),
247        }
248    }
249
250    /// Get a read-only handle to the global configuration of the
251    /// [`EventCache`].
252    pub async fn config(&self) -> impl Deref<Target = EventCacheConfig> + '_ {
253        self.inner.config.read().await
254    }
255
256    /// Get a writable handle to the global configuration of the [`EventCache`].
257    pub async fn config_mut(&self) -> impl DerefMut<Target = EventCacheConfig> + '_ {
258        self.inner.config.write().await
259    }
260
261    /// Subscribes to updates that a thread subscription has been sent.
262    ///
263    /// For testing purposes only.
264    #[cfg(feature = "testing")]
265    pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
266        self.inner.thread_subscriber_receiver.resubscribe()
267    }
268
269    /// Starts subscribing the [`EventCache`] to sync responses, if not done
270    /// before.
271    ///
272    /// Re-running this has no effect if we already subscribed before, and is
273    /// cheap.
274    pub fn subscribe(&self) -> Result<()> {
275        let client = self.inner.client()?;
276
277        // Initialize the drop handles.
278        let _ = self.inner.drop_handles.get_or_init(|| {
279            let task_monitor = client.task_monitor();
280
281            // Spawn the task that will listen to all the room updates at once.
282            let listen_updates_task = task_monitor.spawn_background_task("event_cache::room_updates_task", tasks::room_updates_task(
283                self.inner.clone(),
284                client.subscribe_to_all_room_updates(),
285            ));
286
287            let ignore_user_list_update_task = task_monitor.spawn_background_task("event_cache::ignore_user_list_update_task", tasks::ignore_user_list_update_task(
288                self.inner.clone(),
289                client.subscribe_to_ignore_user_list_changes(),
290            ));
291
292            let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
293
294            // Force-initialize the sender in the [`RoomEventCacheInner`].
295            self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
296
297            let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", tasks::auto_shrink_linked_chunk_task(
298                Arc::downgrade(&self.inner),
299                auto_shrink_receiver,
300            ));
301
302            #[cfg(feature = "e2e-encryption")]
303            let redecryptor = {
304                let receiver = self
305                    .inner
306                    .redecryption_channels
307                    .decryption_request_receiver
308                    .lock()
309                    .take()
310                    .expect("We should have initialized the channel an subscribing should happen only once");
311
312                redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
313            };
314
315
316            Arc::new(EventCacheDropHandles {
317                listen_updates_task,
318                ignore_user_list_update_task,
319                auto_shrink_linked_chunk_task,
320                #[cfg(feature = "e2e-encryption")]
321                _redecryptor: redecryptor,
322            })
323        });
324
325        Ok(())
326    }
327
328    /// For benchmarking purposes only.
329    #[doc(hidden)]
330    pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
331        self.inner.handle_room_updates(updates).await
332    }
333
334    /// Check whether [`EventCache::subscribe`] has been called.
335    pub fn has_subscribed(&self) -> bool {
336        self.inner.drop_handles.get().is_some()
337    }
338
339    /// Return a room-specific view over the [`EventCache`].
340    pub(crate) async fn for_room(
341        &self,
342        room_id: &RoomId,
343    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
344        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
345            return Err(EventCacheError::NotSubscribedYet);
346        };
347
348        let room = self.inner.all_caches_for_room(room_id).await?.room.clone();
349
350        Ok((room, drop_handles))
351    }
352
353    /// Cleanly clear all the rooms' event caches.
354    ///
355    /// This will notify any live observers that the room has been cleared.
356    pub async fn clear_all_rooms(&self) -> Result<()> {
357        self.inner.clear_all_rooms().await
358    }
359
360    /// Subscribe to room _generic_ updates.
361    ///
362    /// If one wants to listen what has changed in a specific room, the
363    /// [`RoomEventCache::subscribe`] is recommended. However, the
364    /// [`RoomEventCacheSubscriber`] type triggers side-effects.
365    ///
366    /// If one wants to get a high-overview, generic, updates for rooms, and
367    /// without side-effects, this method is recommended. Also, dropping the
368    /// receiver of this channel will not trigger any side-effect.
369    pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
370        self.inner.generic_update_sender.subscribe()
371    }
372}
373
374/// Global configuration for the [`EventCache`], applied to every single room.
375#[derive(Clone, Copy, Debug)]
376pub struct EventCacheConfig {
377    /// Maximum number of concurrent /event requests when loading pinned events.
378    pub max_pinned_events_concurrent_requests: usize,
379
380    /// Maximum number of pinned events to load, for any room.
381    pub max_pinned_events_to_load: usize,
382}
383
384impl EventCacheConfig {
385    /// The default maximum number of pinned events to load.
386    const DEFAULT_MAX_EVENTS_TO_LOAD: usize = 128;
387
388    /// The default maximum number of concurrent requests to perform when
389    /// loading the pinned events.
390    const DEFAULT_MAX_CONCURRENT_REQUESTS: usize = 8;
391}
392
393impl Default for EventCacheConfig {
394    fn default() -> Self {
395        Self {
396            max_pinned_events_concurrent_requests: Self::DEFAULT_MAX_CONCURRENT_REQUESTS,
397            max_pinned_events_to_load: Self::DEFAULT_MAX_EVENTS_TO_LOAD,
398        }
399    }
400}
401
402type CachesByRoom = HashMap<OwnedRoomId, Caches>;
403
404struct EventCacheInner {
405    /// A weak reference to the inner client, useful when trying to get a handle
406    /// on the owning client.
407    client: WeakClient,
408
409    /// Global configuration for the event cache.
410    config: RwLock<EventCacheConfig>,
411
412    /// Reference to the underlying store.
413    store: EventCacheStoreLock,
414
415    /// A lock used when many rooms must be updated at once.
416    ///
417    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
418    /// ensure that multiple updates will be applied in the correct order, which
419    /// is enforced by taking this lock when handling an update.
420    // TODO: that's the place to add a cross-process lock!
421    multiple_room_updates_lock: Mutex<()>,
422
423    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
424    //
425    // It's behind an `Arc` to get owned locks.
426    by_room: Arc<RwLock<CachesByRoom>>,
427
428    /// Handles to keep alive the task listening to updates.
429    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
430
431    /// A sender for notifications that a room *may* need to be auto-shrunk.
432    ///
433    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
434    /// instance.
435    ///
436    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
437    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
438
439    /// A sender for room generic update.
440    ///
441    /// See doc comment of [`RoomEventCacheGenericUpdate`] and
442    /// [`EventCache::subscribe_to_room_generic_updates`].
443    generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
444
445    /// A sender for a persisted linked chunk update.
446    ///
447    /// This is used to notify that some linked chunk has persisted some updates
448    /// to a store, during sync or a back-pagination of *any* linked chunk.
449    /// This can be used by observers to look for new events.
450    ///
451    /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
452    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
453
454    /// A background task listening to room and send queue updates, and
455    /// automatically subscribing the user to threads when needed, based on
456    /// the semantics of MSC4306.
457    ///
458    /// One important constraint is that there is only one such task per
459    /// [`EventCache`], so it does listen to *all* rooms at the same time.
460    _thread_subscriber_task: BackgroundTaskHandle,
461
462    /// A background task listening to room updates, and
463    /// automatically handling search index operations add/remove/edit
464    /// depending on the event type.
465    ///
466    /// One important constraint is that there is only one such task per
467    /// [`EventCache`], so it does listen to *all* rooms at the same time.
468    #[cfg(feature = "experimental-search")]
469    _search_indexing_task: BackgroundTaskHandle,
470
471    /// A test helper receiver that will be emitted every time the thread
472    /// subscriber task subscribed to a new thread.
473    ///
474    /// This is helpful for tests to coordinate that a new thread subscription
475    /// has been sent or not.
476    #[cfg(feature = "testing")]
477    thread_subscriber_receiver: Receiver<()>,
478
479    #[cfg(feature = "e2e-encryption")]
480    redecryption_channels: redecryptor::RedecryptorChannels,
481}
482
483type AutoShrinkChannelPayload = OwnedRoomId;
484
485impl EventCacheInner {
486    fn client(&self) -> Result<Client> {
487        self.client.get().ok_or(EventCacheError::ClientDropped)
488    }
489
490    /// Clears all the room's data.
491    async fn clear_all_rooms(&self) -> Result<()> {
492        // Okay, here's where things get complicated.
493        //
494        // On the one hand, `by_room` may include storage for *some* rooms that we know
495        // about, but not *all* of them. Any room that hasn't been loaded in the
496        // client, or touched by a sync, will remain unloaded in memory, so it
497        // will be missing from `self.by_room`. As a result, we need to make
498        // sure that we're hitting the storage backend to *really* clear all the
499        // rooms, including those that haven't been loaded yet.
500        //
501        // On the other hand, one must NOT clear the `by_room` map, because if someone
502        // subscribed to a room update, they would never get any new update for
503        // that room, since re-creating the `RoomEventCache` would create a new,
504        // unrelated sender.
505        //
506        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
507        // store backend.
508        //
509        // As a result, for a short while, the in-memory linked chunks
510        // will be desynchronized from the storage. We need to be careful then. During
511        // that short while, we don't want *anyone* to touch the linked chunk
512        // (be it in memory or in the storage).
513        //
514        // And since that requirement applies to *any* room in `by_room` at the same
515        // time, we'll have to take the locks for *all* the live rooms, so as to
516        // properly clear the underlying storage.
517        //
518        // At this point, you might be scared about the potential for deadlocking. I am
519        // as well, but I'm convinced we're fine:
520        //
521        // 1. the lock for `by_room` is usually held only for a short while, and
522        //    independently of the other two kinds.
523        // 2. the state may acquire the store cross-process lock internally, but only
524        //    while the state's methods are called (so it's always transient). As a
525        //    result, as soon as we've acquired the state locks, the store lock ought to
526        //    be free.
527        // 3. The store lock is held explicitly only in a small scoped area below.
528        // 4. Then the store lock will be held internally when calling `reset_all()`,
529        //    but at this point it's only held for a short while each time, so rooms
530        //    will take turn to acquire it.
531
532        let mut all_caches = self.by_room.write().await;
533
534        // Prepare to reset all the caches: it ensures nobody is accessing or mutating
535        // them.
536        let resets =
537            try_join_all(all_caches.values_mut().map(|caches| caches.prepare_to_reset())).await?;
538
539        // Clear the storage for all the rooms, using the storage facility.
540        let store_guard = match self.store.lock().await? {
541            EventCacheStoreLockState::Clean(store_guard) => store_guard,
542            EventCacheStoreLockState::Dirty(store_guard) => store_guard,
543        };
544        store_guard.clear_all_linked_chunks().await?;
545
546        // At this point, all the in-memory linked chunks are desynchronized from their
547        // storages. Resynchronize them manually by resetting them.
548        try_join_all(resets.into_iter().map(|reset_cache| reset_cache.reset_all())).await?;
549
550        Ok(())
551    }
552
553    /// Handles a single set of room updates at once.
554    #[instrument(skip(self, updates))]
555    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
556        // First, take the lock that indicates we're processing updates, to avoid
557        // handling multiple updates concurrently.
558        let _lock = {
559            let _timer = timer!("Taking the `multiple_room_updates_lock`");
560            self.multiple_room_updates_lock.lock().await
561        };
562
563        // NOTE: We tried to make this concurrent at some point, but it turned out to be
564        // a performance regression, even for large sync updates. Lacking time
565        // to investigate, this code remains sequential for now. See also
566        // https://github.com/matrix-org/matrix-rust-sdk/pull/5426.
567
568        // Left rooms.
569        for (room_id, left_room_update) in updates.left {
570            let Ok(caches) = self.all_caches_for_room(&room_id).await else {
571                error!(?room_id, "Room must exist");
572                continue;
573            };
574
575            if let Err(err) = caches.handle_left_room_update(left_room_update).await {
576                // Non-fatal error, try to continue to the next room.
577                error!("handling left room update: {err}");
578            }
579        }
580
581        // Joined rooms.
582        for (room_id, joined_room_update) in updates.joined {
583            trace!(?room_id, "Handling a `JoinedRoomUpdate`");
584
585            let Ok(caches) = self.all_caches_for_room(&room_id).await else {
586                error!(?room_id, "Room must exist");
587                continue;
588            };
589
590            if let Err(err) = caches.handle_joined_room_update(joined_room_update).await {
591                // Non-fatal error, try to continue to the next room.
592                error!(%room_id, "handling joined room update: {err}");
593            }
594        }
595
596        // Invited rooms.
597        // TODO: we don't anything with `updates.invite` at this point.
598
599        Ok(())
600    }
601
602    /// Return all the event caches associated to a specific room.
603    async fn all_caches_for_room(
604        &self,
605        room_id: &RoomId,
606    ) -> Result<OwnedRwLockReadGuard<CachesByRoom, Caches>> {
607        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
608        // write lock.
609        match OwnedRwLockReadGuard::try_map(self.by_room.clone().read_owned().await, |by_room| {
610            by_room.get(room_id)
611        }) {
612            Ok(caches) => Ok(caches),
613
614            Err(by_room_guard) => {
615                // Slow-path: the entry doesn't exist; let's acquire a write lock.
616                drop(by_room_guard);
617                let by_room_guard = self.by_room.clone().write_owned().await;
618
619                // In the meanwhile, some other caller might have obtained write access and done
620                // the same, so check for existence again.
621                let mut by_room_guard =
622                    match OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
623                        by_room.get(room_id)
624                    }) {
625                        Ok(caches) => return Ok(caches),
626                        Err(by_room_guard) => by_room_guard,
627                    };
628
629                let caches = Caches::new(
630                    &self.client,
631                    room_id,
632                    self.generic_update_sender.clone(),
633                    self.linked_chunk_update_sender.clone(),
634                    // SAFETY: we must have subscribed before reaching this code, otherwise
635                    // something is very wrong.
636                    self.auto_shrink_sender.get().cloned().expect(
637                        "we must have called `EventCache::subscribe()` before calling here.",
638                    ),
639                    self.store.clone(),
640                )
641                .await?;
642
643                by_room_guard.insert(room_id.to_owned(), caches);
644
645                Ok(OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
646                    by_room.get(room_id)
647                })
648                .expect("`Caches` has just been inserted"))
649            }
650        }
651    }
652}
653
654/// Indicate where events are coming from.
655#[derive(Debug, Clone)]
656pub enum EventsOrigin {
657    /// Events are coming from a sync.
658    Sync,
659
660    /// Events are coming from pagination.
661    Pagination,
662
663    /// The cause of the change is purely internal to the cache.
664    Cache,
665}
666
667#[cfg(test)]
668mod tests {
669    use std::{ops::Not, sync::Arc, time::Duration};
670
671    use assert_matches::assert_matches;
672    use futures_util::FutureExt as _;
673    use matrix_sdk_base::{
674        RoomState,
675        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
676        sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
677    };
678    use matrix_sdk_test::{
679        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
680    };
681    use ruma::{event_id, room_id, user_id};
682    use tokio::time::sleep;
683
684    use super::{EventCacheError, RoomEventCacheGenericUpdate};
685    use crate::test_utils::{
686        assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
687    };
688
689    #[async_test]
690    async fn test_must_explicitly_subscribe() {
691        let client = logged_in_client(None).await;
692
693        let event_cache = client.event_cache();
694
695        // If I create a room event subscriber for a room before subscribing the event
696        // cache,
697        let room_id = room_id!("!omelette:fromage.fr");
698        let result = event_cache.for_room(room_id).await;
699
700        // Then it fails, because one must explicitly call `.subscribe()` on the event
701        // cache.
702        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
703    }
704
705    #[async_test]
706    async fn test_get_event_by_id() {
707        let client = logged_in_client(None).await;
708        let room_id1 = room_id!("!galette:saucisse.bzh");
709        let room_id2 = room_id!("!crepe:saucisse.bzh");
710
711        client.base_client().get_or_create_room(room_id1, RoomState::Joined);
712        client.base_client().get_or_create_room(room_id2, RoomState::Joined);
713
714        let event_cache = client.event_cache();
715        event_cache.subscribe().unwrap();
716
717        // Insert two rooms with a few events.
718        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
719
720        let eid1 = event_id!("$1");
721        let eid2 = event_id!("$2");
722        let eid3 = event_id!("$3");
723
724        let joined_room_update1 = JoinedRoomUpdate {
725            timeline: Timeline {
726                events: vec![
727                    f.text_msg("hey").event_id(eid1).into(),
728                    f.text_msg("you").event_id(eid2).into(),
729                ],
730                ..Default::default()
731            },
732            ..Default::default()
733        };
734
735        let joined_room_update2 = JoinedRoomUpdate {
736            timeline: Timeline {
737                events: vec![f.text_msg("bjr").event_id(eid3).into()],
738                ..Default::default()
739            },
740            ..Default::default()
741        };
742
743        let mut updates = RoomUpdates::default();
744        updates.joined.insert(room_id1.to_owned(), joined_room_update1);
745        updates.joined.insert(room_id2.to_owned(), joined_room_update2);
746
747        // Have the event cache handle them.
748        event_cache.inner.handle_room_updates(updates).await.unwrap();
749
750        // We can find the events in a single room.
751        let room1 = client.get_room(room_id1).unwrap();
752
753        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
754
755        let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
756        assert_event_matches_msg(&found1, "hey");
757
758        let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
759        assert_event_matches_msg(&found2, "you");
760
761        // Retrieving the event with id3 from the room which doesn't contain it will
762        // fail…
763        assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
764    }
765
766    #[async_test]
767    async fn test_save_event() {
768        let client = logged_in_client(None).await;
769        let room_id = room_id!("!galette:saucisse.bzh");
770
771        let event_cache = client.event_cache();
772        event_cache.subscribe().unwrap();
773
774        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
775        let event_id = event_id!("$1");
776
777        client.base_client().get_or_create_room(room_id, RoomState::Joined);
778        let room = client.get_room(room_id).unwrap();
779
780        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
781        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
782
783        // Retrieving the event at the room-wide cache works.
784        assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
785    }
786
787    #[async_test]
788    async fn test_generic_update_when_loading_rooms() {
789        // Create 2 rooms. One of them has data in the event cache storage.
790        let user = user_id!("@mnt_io:matrix.org");
791        let client = logged_in_client(None).await;
792        let room_id_0 = room_id!("!raclette:patate.ch");
793        let room_id_1 = room_id!("!fondue:patate.ch");
794
795        let event_factory = EventFactory::new().room(room_id_0).sender(user);
796
797        let event_cache = client.event_cache();
798        event_cache.subscribe().unwrap();
799
800        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
801        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
802
803        client
804            .event_cache_store()
805            .lock()
806            .await
807            .expect("Could not acquire the event cache lock")
808            .as_clean()
809            .expect("Could not acquire a clean event cache lock")
810            .handle_linked_chunk_updates(
811                LinkedChunkId::Room(room_id_0),
812                vec![
813                    // Non-empty items chunk.
814                    Update::NewItemsChunk {
815                        previous: None,
816                        new: ChunkIdentifier::new(0),
817                        next: None,
818                    },
819                    Update::PushItems {
820                        at: Position::new(ChunkIdentifier::new(0), 0),
821                        items: vec![
822                            event_factory
823                                .text_msg("hello")
824                                .sender(user)
825                                .event_id(event_id!("$ev0"))
826                                .into_event(),
827                        ],
828                    },
829                ],
830            )
831            .await
832            .unwrap();
833
834        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
835
836        // Room 0 has initial data, so it must trigger a generic update.
837        {
838            let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
839
840            assert_matches!(
841                generic_stream.recv().await,
842                Ok(RoomEventCacheGenericUpdate { room_id }) => {
843                    assert_eq!(room_id, room_id_0);
844                }
845            );
846        }
847
848        // Room 1 has NO initial data, so nothing should happen.
849        {
850            let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
851
852            assert!(generic_stream.recv().now_or_never().is_none());
853        }
854    }
855
856    #[async_test]
857    async fn test_generic_update_when_paginating_room() {
858        // Create 1 room, with 4 chunks in the event cache storage.
859        let user = user_id!("@mnt_io:matrix.org");
860        let client = logged_in_client(None).await;
861        let room_id = room_id!("!raclette:patate.ch");
862
863        let event_factory = EventFactory::new().room(room_id).sender(user);
864
865        let event_cache = client.event_cache();
866        event_cache.subscribe().unwrap();
867
868        client.base_client().get_or_create_room(room_id, RoomState::Joined);
869
870        client
871            .event_cache_store()
872            .lock()
873            .await
874            .expect("Could not acquire the event cache lock")
875            .as_clean()
876            .expect("Could not acquire a clean event cache lock")
877            .handle_linked_chunk_updates(
878                LinkedChunkId::Room(room_id),
879                vec![
880                    // Empty chunk.
881                    Update::NewItemsChunk {
882                        previous: None,
883                        new: ChunkIdentifier::new(0),
884                        next: None,
885                    },
886                    // Empty chunk.
887                    Update::NewItemsChunk {
888                        previous: Some(ChunkIdentifier::new(0)),
889                        new: ChunkIdentifier::new(1),
890                        next: None,
891                    },
892                    // Non-empty items chunk.
893                    Update::NewItemsChunk {
894                        previous: Some(ChunkIdentifier::new(1)),
895                        new: ChunkIdentifier::new(2),
896                        next: None,
897                    },
898                    Update::PushItems {
899                        at: Position::new(ChunkIdentifier::new(2), 0),
900                        items: vec![
901                            event_factory
902                                .text_msg("hello")
903                                .sender(user)
904                                .event_id(event_id!("$ev0"))
905                                .into_event(),
906                        ],
907                    },
908                    // Non-empty items chunk.
909                    Update::NewItemsChunk {
910                        previous: Some(ChunkIdentifier::new(2)),
911                        new: ChunkIdentifier::new(3),
912                        next: None,
913                    },
914                    Update::PushItems {
915                        at: Position::new(ChunkIdentifier::new(3), 0),
916                        items: vec![
917                            event_factory
918                                .text_msg("world")
919                                .sender(user)
920                                .event_id(event_id!("$ev1"))
921                                .into_event(),
922                        ],
923                    },
924                ],
925            )
926            .await
927            .unwrap();
928
929        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
930
931        // Room is initialised, it gets one event in the timeline.
932        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
933
934        assert_matches!(
935            generic_stream.recv().await,
936            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
937                assert_eq!(room_id, expected_room_id);
938            }
939        );
940
941        let pagination = room_event_cache.pagination();
942
943        // Paginate, it gets one new event in the timeline.
944        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
945
946        assert_eq!(pagination_outcome.events.len(), 1);
947        assert!(pagination_outcome.reached_start.not());
948        assert_matches!(
949            generic_stream.recv().await,
950            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
951                assert_eq!(room_id, expected_room_id);
952            }
953        );
954
955        // Paginate, it gets zero new event in the timeline.
956        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
957
958        assert!(pagination_outcome.events.is_empty());
959        assert!(pagination_outcome.reached_start.not());
960        assert!(generic_stream.recv().now_or_never().is_none());
961
962        // Paginate once more. Just checking our scenario is correct.
963        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
964
965        assert!(pagination_outcome.reached_start);
966        assert!(generic_stream.recv().now_or_never().is_none());
967    }
968
969    #[async_test]
970    async fn test_for_room_when_room_is_not_found() {
971        let client = logged_in_client(None).await;
972        let room_id = room_id!("!raclette:patate.ch");
973
974        let event_cache = client.event_cache();
975        event_cache.subscribe().unwrap();
976
977        // Room doesn't exist. It returns an error.
978        assert_matches!(
979            event_cache.for_room(room_id).await,
980            Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
981                assert_eq!(room_id, not_found_room_id);
982            }
983        );
984
985        // Now create the room.
986        client.base_client().get_or_create_room(room_id, RoomState::Joined);
987
988        // Room exists. Everything fine.
989        assert!(event_cache.for_room(room_id).await.is_ok());
990    }
991
992    /// Test that the event cache does not create reference cycles or tasks that
993    /// retain its reference indefinitely, preventing it from being deallocated.
994    #[cfg(not(target_family = "wasm"))]
995    #[async_test]
996    async fn test_no_refcycle_event_cache_tasks() {
997        let client = MockClientBuilder::new(None).build().await;
998
999        // Wait for the init tasks to die.
1000        sleep(Duration::from_secs(1)).await;
1001
1002        let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1003        assert_eq!(event_cache_weak.strong_count(), 1);
1004
1005        {
1006            let room_id = room_id!("!room:example.org");
1007
1008            // Have the client know the room.
1009            let response = SyncResponseBuilder::default()
1010                .add_joined_room(JoinedRoomBuilder::new(room_id))
1011                .build_sync_response();
1012            client.inner.base_client.receive_sync_response(response).await.unwrap();
1013
1014            client.event_cache().subscribe().unwrap();
1015
1016            let (_room_event_cache, _drop_handles) =
1017                client.get_room(room_id).unwrap().event_cache().await.unwrap();
1018        }
1019
1020        drop(client);
1021
1022        // Give a bit of time for background tasks to die.
1023        sleep(Duration::from_secs(1)).await;
1024
1025        // No strong counts should exist now that the Client has been dropped.
1026        assert_eq!(
1027            event_cache_weak.strong_count(),
1028            0,
1029            "Too many strong references to the event cache {}",
1030            event_cache_weak.strong_count()
1031        );
1032    }
1033}