Skip to main content

matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::HashMap,
32    fmt,
33    ops::{Deref, DerefMut},
34    sync::{Arc, OnceLock},
35};
36
37use eyeball::SharedObservable;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40    ThreadingSupport,
41    cross_process_lock::CrossProcessLockError,
42    event_cache::store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
43    linked_chunk::lazy_loader::LazyLoaderError,
44    sync::RoomUpdates,
45    task_monitor::BackgroundTaskHandle,
46    timer,
47};
48use ruma::{OwnedRoomId, RoomId};
49use tokio::sync::{
50    Mutex, RwLock,
51    broadcast::{Receiver, Sender, channel},
52    mpsc,
53};
54use tracing::{error, instrument, trace};
55
56use crate::{
57    Client,
58    client::{ClientInner, WeakClient},
59};
60
61mod caches;
62mod deduplicator;
63mod persistence;
64#[cfg(feature = "e2e-encryption")]
65mod redecryptor;
66mod tasks;
67
68use caches::room::{RoomEventCacheLinkedChunkUpdate, RoomEventCacheStateLock};
69pub use caches::{
70    TimelineVectorDiffs,
71    pagination::{BackPaginationOutcome, PaginationStatus},
72    room::{
73        RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheSubscriber,
74        RoomEventCacheUpdate, pagination::RoomPagination,
75    },
76};
77#[cfg(feature = "e2e-encryption")]
78pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
79
80/// An error observed in the [`EventCache`].
81#[derive(thiserror::Error, Debug)]
82pub enum EventCacheError {
83    /// The [`EventCache`] instance hasn't been initialized with
84    /// [`EventCache::subscribe`]
85    #[error(
86        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
87    )]
88    NotSubscribedYet,
89
90    /// Room is not found.
91    #[error("Room `{room_id}` is not found.")]
92    RoomNotFound {
93        /// The ID of the room not being found.
94        room_id: OwnedRoomId,
95    },
96
97    /// An error has been observed while back-paginating.
98    #[error(transparent)]
99    BackpaginationError(Box<crate::Error>),
100
101    /// Back-pagination was already happening in a given room, where we tried to
102    /// back-paginate again.
103    #[error("We were already back-paginating.")]
104    AlreadyBackpaginating,
105
106    /// An error happening when interacting with storage.
107    #[error(transparent)]
108    Storage(#[from] EventCacheStoreError),
109
110    /// An error happening when attempting to (cross-process) lock storage.
111    #[error(transparent)]
112    LockingStorage(#[from] CrossProcessLockError),
113
114    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
115    /// to. It's possible this weak reference points to nothing anymore, at
116    /// times where we try to use the client.
117    #[error("The owning client of the event cache has been dropped.")]
118    ClientDropped,
119
120    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
121    /// loader.
122    ///
123    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
124    #[error(transparent)]
125    LinkedChunkLoader(#[from] LazyLoaderError),
126
127    /// An error happened when trying to load pinned events; none of them could
128    /// be loaded, which would otherwise result in an empty pinned events
129    /// list, incorrectly.
130    #[error("Unable to load any of the pinned events.")]
131    UnableToLoadPinnedEvents,
132
133    /// An error happened when reading the metadata of a linked chunk, upon
134    /// reload.
135    #[error("the linked chunk metadata is invalid: {details}")]
136    InvalidLinkedChunkMetadata {
137        /// A string containing details about the error.
138        details: String,
139    },
140}
141
142/// A result using the [`EventCacheError`].
143pub type Result<T> = std::result::Result<T, EventCacheError>;
144
145/// Hold handles to the tasks spawn by a [`EventCache`].
146pub struct EventCacheDropHandles {
147    /// Task that listens to room updates.
148    listen_updates_task: BackgroundTaskHandle,
149
150    /// Task that listens to updates to the user's ignored list.
151    ignore_user_list_update_task: BackgroundTaskHandle,
152
153    /// The task used to automatically shrink the linked chunks.
154    auto_shrink_linked_chunk_task: BackgroundTaskHandle,
155
156    /// The task used to automatically redecrypt UTDs.
157    #[cfg(feature = "e2e-encryption")]
158    _redecryptor: redecryptor::Redecryptor,
159}
160
161impl fmt::Debug for EventCacheDropHandles {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
164    }
165}
166
167impl Drop for EventCacheDropHandles {
168    fn drop(&mut self) {
169        self.listen_updates_task.abort();
170        self.ignore_user_list_update_task.abort();
171        self.auto_shrink_linked_chunk_task.abort();
172    }
173}
174
175/// An event cache, providing lots of useful functionality for clients.
176///
177/// Cloning is shallow, and thus is cheap to do.
178///
179/// See also the module-level comment.
180#[derive(Clone)]
181pub struct EventCache {
182    /// Reference to the inner cache.
183    inner: Arc<EventCacheInner>,
184}
185
186impl fmt::Debug for EventCache {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        f.debug_struct("EventCache").finish_non_exhaustive()
189    }
190}
191
192impl EventCache {
193    /// Create a new [`EventCache`] for the given client.
194    pub(crate) fn new(client: &Arc<ClientInner>, event_cache_store: EventCacheStoreLock) -> Self {
195        let (generic_update_sender, _) = channel(128);
196        let (linked_chunk_update_sender, _) = channel(128);
197
198        let weak_client = WeakClient::from_inner(client);
199
200        let (thread_subscriber_sender, thread_subscriber_receiver) = channel(128);
201        let thread_subscriber_task = client
202            .task_monitor
203            .spawn_background_task(
204                "event_cache::thread_subscriber",
205                tasks::thread_subscriber_task(
206                    weak_client.clone(),
207                    linked_chunk_update_sender.clone(),
208                    thread_subscriber_sender,
209                ),
210            )
211            .abort_on_drop();
212
213        #[cfg(feature = "experimental-search")]
214        let search_indexing_task = client
215            .task_monitor
216            .spawn_background_task(
217                "event_cache::search_indexing",
218                tasks::search_indexing_task(
219                    weak_client.clone(),
220                    linked_chunk_update_sender.clone(),
221                ),
222            )
223            .abort_on_drop();
224
225        #[cfg(feature = "e2e-encryption")]
226        let redecryption_channels = redecryptor::RedecryptorChannels::new();
227
228        Self {
229            inner: Arc::new(EventCacheInner {
230                client: weak_client,
231                config: RwLock::new(EventCacheConfig::default()),
232                store: event_cache_store,
233                multiple_room_updates_lock: Default::default(),
234                by_room: Default::default(),
235                drop_handles: Default::default(),
236                auto_shrink_sender: Default::default(),
237                generic_update_sender,
238                linked_chunk_update_sender,
239                _thread_subscriber_task: thread_subscriber_task,
240                #[cfg(feature = "experimental-search")]
241                _search_indexing_task: search_indexing_task,
242                #[cfg(feature = "e2e-encryption")]
243                redecryption_channels,
244                thread_subscriber_receiver,
245            }),
246        }
247    }
248
249    /// Get a read-only handle to the global configuration of the
250    /// [`EventCache`].
251    pub async fn config(&self) -> impl Deref<Target = EventCacheConfig> + '_ {
252        self.inner.config.read().await
253    }
254
255    /// Get a writable handle to the global configuration of the [`EventCache`].
256    pub async fn config_mut(&self) -> impl DerefMut<Target = EventCacheConfig> + '_ {
257        self.inner.config.write().await
258    }
259
260    /// Subscribes to updates that a thread subscription has been sent.
261    ///
262    /// For testing purposes only.
263    #[doc(hidden)]
264    pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
265        self.inner.thread_subscriber_receiver.resubscribe()
266    }
267
268    /// Starts subscribing the [`EventCache`] to sync responses, if not done
269    /// before.
270    ///
271    /// Re-running this has no effect if we already subscribed before, and is
272    /// cheap.
273    pub fn subscribe(&self) -> Result<()> {
274        let client = self.inner.client()?;
275
276        // Initialize the drop handles.
277        let _ = self.inner.drop_handles.get_or_init(|| {
278            let task_monitor = client.task_monitor();
279
280            // Spawn the task that will listen to all the room updates at once.
281            let listen_updates_task = task_monitor.spawn_background_task("event_cache::room_updates_task", tasks::room_updates_task(
282                self.inner.clone(),
283                client.subscribe_to_all_room_updates(),
284            ));
285
286            let ignore_user_list_update_task = task_monitor.spawn_background_task("event_cache::ignore_user_list_update_task", tasks::ignore_user_list_update_task(
287                self.inner.clone(),
288                client.subscribe_to_ignore_user_list_changes(),
289            ));
290
291            let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
292
293            // Force-initialize the sender in the [`RoomEventCacheInner`].
294            self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
295
296            let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", tasks::auto_shrink_linked_chunk_task(
297                Arc::downgrade(&self.inner),
298                auto_shrink_receiver,
299            ));
300
301            #[cfg(feature = "e2e-encryption")]
302            let redecryptor = {
303                let receiver = self
304                    .inner
305                    .redecryption_channels
306                    .decryption_request_receiver
307                    .lock()
308                    .take()
309                    .expect("We should have initialized the channel an subscribing should happen only once");
310
311                redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
312            };
313
314
315            Arc::new(EventCacheDropHandles {
316                listen_updates_task,
317                ignore_user_list_update_task,
318                auto_shrink_linked_chunk_task,
319                #[cfg(feature = "e2e-encryption")]
320                _redecryptor: redecryptor,
321            })
322        });
323
324        Ok(())
325    }
326
327    /// For benchmarking purposes only.
328    #[doc(hidden)]
329    pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
330        self.inner.handle_room_updates(updates).await
331    }
332
333    /// Check whether [`EventCache::subscribe`] has been called.
334    pub fn has_subscribed(&self) -> bool {
335        self.inner.drop_handles.get().is_some()
336    }
337
338    /// Return a room-specific view over the [`EventCache`].
339    pub(crate) async fn for_room(
340        &self,
341        room_id: &RoomId,
342    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
343        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
344            return Err(EventCacheError::NotSubscribedYet);
345        };
346
347        let room = self.inner.for_room(room_id).await?;
348
349        Ok((room, drop_handles))
350    }
351
352    /// Cleanly clear all the rooms' event caches.
353    ///
354    /// This will notify any live observers that the room has been cleared.
355    pub async fn clear_all_rooms(&self) -> Result<()> {
356        self.inner.clear_all_rooms().await
357    }
358
359    /// Subscribe to room _generic_ updates.
360    ///
361    /// If one wants to listen what has changed in a specific room, the
362    /// [`RoomEventCache::subscribe`] is recommended. However, the
363    /// [`RoomEventCacheSubscriber`] type triggers side-effects.
364    ///
365    /// If one wants to get a high-overview, generic, updates for rooms, and
366    /// without side-effects, this method is recommended. Also, dropping the
367    /// receiver of this channel will not trigger any side-effect.
368    pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
369        self.inner.generic_update_sender.subscribe()
370    }
371}
372
373/// Global configuration for the [`EventCache`], applied to every single room.
374#[derive(Clone, Copy, Debug)]
375pub struct EventCacheConfig {
376    /// Maximum number of concurrent /event requests when loading pinned events.
377    pub max_pinned_events_concurrent_requests: usize,
378
379    /// Maximum number of pinned events to load, for any room.
380    pub max_pinned_events_to_load: usize,
381}
382
383impl EventCacheConfig {
384    /// The default maximum number of pinned events to load.
385    const DEFAULT_MAX_EVENTS_TO_LOAD: usize = 128;
386
387    /// The default maximum number of concurrent requests to perform when
388    /// loading the pinned events.
389    const DEFAULT_MAX_CONCURRENT_REQUESTS: usize = 8;
390}
391
392impl Default for EventCacheConfig {
393    fn default() -> Self {
394        Self {
395            max_pinned_events_concurrent_requests: Self::DEFAULT_MAX_CONCURRENT_REQUESTS,
396            max_pinned_events_to_load: Self::DEFAULT_MAX_EVENTS_TO_LOAD,
397        }
398    }
399}
400
401struct EventCacheInner {
402    /// A weak reference to the inner client, useful when trying to get a handle
403    /// on the owning client.
404    client: WeakClient,
405
406    /// Global configuration for the event cache.
407    config: RwLock<EventCacheConfig>,
408
409    /// Reference to the underlying store.
410    store: EventCacheStoreLock,
411
412    /// A lock used when many rooms must be updated at once.
413    ///
414    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
415    /// ensure that multiple updates will be applied in the correct order, which
416    /// is enforced by taking this lock when handling an update.
417    // TODO: that's the place to add a cross-process lock!
418    multiple_room_updates_lock: Mutex<()>,
419
420    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
421    by_room: RwLock<HashMap<OwnedRoomId, RoomEventCache>>,
422
423    /// Handles to keep alive the task listening to updates.
424    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
425
426    /// A sender for notifications that a room *may* need to be auto-shrunk.
427    ///
428    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
429    /// instance.
430    ///
431    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
432    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
433
434    /// A sender for room generic update.
435    ///
436    /// See doc comment of [`RoomEventCacheGenericUpdate`] and
437    /// [`EventCache::subscribe_to_room_generic_updates`].
438    generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
439
440    /// A sender for a persisted linked chunk update.
441    ///
442    /// This is used to notify that some linked chunk has persisted some updates
443    /// to a store, during sync or a back-pagination of *any* linked chunk.
444    /// This can be used by observers to look for new events.
445    ///
446    /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
447    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
448
449    /// A background task listening to room and send queue updates, and
450    /// automatically subscribing the user to threads when needed, based on
451    /// the semantics of MSC4306.
452    ///
453    /// One important constraint is that there is only one such task per
454    /// [`EventCache`], so it does listen to *all* rooms at the same time.
455    _thread_subscriber_task: BackgroundTaskHandle,
456
457    /// A background task listening to room updates, and
458    /// automatically handling search index operations add/remove/edit
459    /// depending on the event type.
460    ///
461    /// One important constraint is that there is only one such task per
462    /// [`EventCache`], so it does listen to *all* rooms at the same time.
463    #[cfg(feature = "experimental-search")]
464    _search_indexing_task: BackgroundTaskHandle,
465
466    /// A test helper receiver that will be emitted every time the thread
467    /// subscriber task subscribed to a new thread.
468    ///
469    /// This is helpful for tests to coordinate that a new thread subscription
470    /// has been sent or not.
471    thread_subscriber_receiver: Receiver<()>,
472
473    #[cfg(feature = "e2e-encryption")]
474    redecryption_channels: redecryptor::RedecryptorChannels,
475}
476
477type AutoShrinkChannelPayload = OwnedRoomId;
478
479impl EventCacheInner {
480    fn client(&self) -> Result<Client> {
481        self.client.get().ok_or(EventCacheError::ClientDropped)
482    }
483
484    /// Clears all the room's data.
485    async fn clear_all_rooms(&self) -> Result<()> {
486        // Okay, here's where things get complicated.
487        //
488        // On the one hand, `by_room` may include storage for *some* rooms that we know
489        // about, but not *all* of them. Any room that hasn't been loaded in the
490        // client, or touched by a sync, will remain unloaded in memory, so it
491        // will be missing from `self.by_room`. As a result, we need to make
492        // sure that we're hitting the storage backend to *really* clear all the
493        // rooms, including those that haven't been loaded yet.
494        //
495        // On the other hand, one must NOT clear the `by_room` map, because if someone
496        // subscribed to a room update, they would never get any new update for
497        // that room, since re-creating the `RoomEventCache` would create a new,
498        // unrelated sender.
499        //
500        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
501        // store backend.
502        //
503        // As a result, for a short while, the in-memory linked chunks
504        // will be desynchronized from the storage. We need to be careful then. During
505        // that short while, we don't want *anyone* to touch the linked chunk
506        // (be it in memory or in the storage).
507        //
508        // And since that requirement applies to *any* room in `by_room` at the same
509        // time, we'll have to take the locks for *all* the live rooms, so as to
510        // properly clear the underlying storage.
511        //
512        // At this point, you might be scared about the potential for deadlocking. I am
513        // as well, but I'm convinced we're fine:
514        // 1. the lock for `by_room` is usually held only for a short while, and
515        //    independently of the other two kinds.
516        // 2. the state may acquire the store cross-process lock internally, but only
517        //    while the state's methods are called (so it's always transient). As a
518        //    result, as soon as we've acquired the state locks, the store lock ought to
519        //    be free.
520        // 3. The store lock is held explicitly only in a small scoped area below.
521        // 4. Then the store lock will be held internally when calling `reset()`, but at
522        //    this point it's only held for a short while each time, so rooms will take
523        //    turn to acquire it.
524
525        let rooms = self.by_room.write().await;
526
527        // Collect all the rooms' state locks, first: we can clear the storage only when
528        // nobody will touch it at the same time.
529        let room_locks =
530            join_all(rooms.values().map(|room| async move { (room, room.state().write().await) }))
531                .await;
532
533        // Clear the storage for all the rooms, using the storage facility.
534        let store_guard = match self.store.lock().await? {
535            EventCacheStoreLockState::Clean(store_guard) => store_guard,
536            EventCacheStoreLockState::Dirty(store_guard) => store_guard,
537        };
538        store_guard.clear_all_linked_chunks().await?;
539
540        // At this point, all the in-memory linked chunks are desynchronized from the
541        // storage. Resynchronize them manually by calling reset(), and
542        // propagate updates to observers.
543        try_join_all(room_locks.into_iter().map(|(room, state_guard)| async move {
544            let mut state_guard = state_guard?;
545            let updates_as_vector_diffs = state_guard.reset().await?;
546
547            room.update_sender().send(
548                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
549                    diffs: updates_as_vector_diffs,
550                    origin: EventsOrigin::Cache,
551                }),
552                Some(RoomEventCacheGenericUpdate { room_id: room.room_id().to_owned() }),
553            );
554
555            Ok::<_, EventCacheError>(())
556        }))
557        .await?;
558
559        Ok(())
560    }
561
562    /// Handles a single set of room updates at once.
563    #[instrument(skip(self, updates))]
564    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
565        // First, take the lock that indicates we're processing updates, to avoid
566        // handling multiple updates concurrently.
567        let _lock = {
568            let _timer = timer!("Taking the `multiple_room_updates_lock`");
569            self.multiple_room_updates_lock.lock().await
570        };
571
572        // NOTE: bnjbvr tried to make this concurrent at some point, but it turned out
573        // to be a performance regression, even for large sync updates. Lacking
574        // time to investigate, this code remains sequential for now. See also
575        // https://github.com/matrix-org/matrix-rust-sdk/pull/5426.
576
577        // Left rooms.
578        for (room_id, left_room_update) in updates.left {
579            let Ok(room) = self.for_room(&room_id).await else {
580                error!(?room_id, "Room must exist");
581                continue;
582            };
583
584            if let Err(err) = room.handle_left_room_update(left_room_update).await {
585                // Non-fatal error, try to continue to the next room.
586                error!("handling left room update: {err}");
587            }
588        }
589
590        // Joined rooms.
591        for (room_id, joined_room_update) in updates.joined {
592            trace!(?room_id, "Handling a `JoinedRoomUpdate`");
593
594            let Ok(room) = self.for_room(&room_id).await else {
595                error!(?room_id, "Room must exist");
596                continue;
597            };
598
599            if let Err(err) = room.handle_joined_room_update(joined_room_update).await {
600                // Non-fatal error, try to continue to the next room.
601                error!(%room_id, "handling joined room update: {err}");
602            }
603        }
604
605        // Invited rooms.
606        // TODO: we don't anything with `updates.invite` at this point.
607
608        Ok(())
609    }
610
611    /// Return a room-specific view over the [`EventCache`].
612    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
613        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
614        // write lock.
615        let by_room_guard = self.by_room.read().await;
616
617        match by_room_guard.get(room_id) {
618            Some(room) => Ok(room.clone()),
619
620            None => {
621                // Slow-path: the entry doesn't exist; let's acquire a write lock.
622                drop(by_room_guard);
623                let mut by_room_guard = self.by_room.write().await;
624
625                // In the meanwhile, some other caller might have obtained write access and done
626                // the same, so check for existence again.
627                if let Some(room) = by_room_guard.get(room_id) {
628                    return Ok(room.clone());
629                }
630
631                let pagination_status =
632                    SharedObservable::new(PaginationStatus::Idle { hit_timeline_start: false });
633
634                let Some(client) = self.client.get() else {
635                    return Err(EventCacheError::ClientDropped);
636                };
637
638                let room = client
639                    .get_room(room_id)
640                    .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
641                let room_version_rules = room.clone_info().room_version_rules_or_default();
642
643                let enabled_thread_support = matches!(
644                    client.base_client().threading_support,
645                    ThreadingSupport::Enabled { .. }
646                );
647
648                let update_sender = caches::room::RoomEventCacheUpdateSender::new(
649                    self.generic_update_sender.clone(),
650                );
651
652                let own_user_id =
653                    client.user_id().expect("the user must be logged in, at this point").to_owned();
654                let room_state = RoomEventCacheStateLock::new(
655                    own_user_id,
656                    room_id.to_owned(),
657                    room_version_rules,
658                    enabled_thread_support,
659                    update_sender.clone(),
660                    self.linked_chunk_update_sender.clone(),
661                    self.store.clone(),
662                    pagination_status.clone(),
663                )
664                .await?;
665
666                let timeline_is_not_empty =
667                    room_state.read().await?.room_linked_chunk().revents().next().is_some();
668
669                // SAFETY: we must have subscribed before reaching this code, otherwise
670                // something is very wrong.
671                let auto_shrink_sender =
672                    self.auto_shrink_sender.get().cloned().expect(
673                        "we must have called `EventCache::subscribe()` before calling here.",
674                    );
675
676                let room_event_cache = RoomEventCache::new(
677                    self.client.clone(),
678                    room_state,
679                    pagination_status,
680                    room_id.to_owned(),
681                    auto_shrink_sender,
682                    update_sender,
683                );
684
685                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
686
687                // If at least one event has been loaded, it means there is a timeline. Let's
688                // emit a generic update.
689                if timeline_is_not_empty {
690                    let _ = self
691                        .generic_update_sender
692                        .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
693                }
694
695                Ok(room_event_cache)
696            }
697        }
698    }
699}
700
701/// Indicate where events are coming from.
702#[derive(Debug, Clone)]
703pub enum EventsOrigin {
704    /// Events are coming from a sync.
705    Sync,
706
707    /// Events are coming from pagination.
708    Pagination,
709
710    /// The cause of the change is purely internal to the cache.
711    Cache,
712}
713
714#[cfg(test)]
715mod tests {
716    use std::{ops::Not, sync::Arc, time::Duration};
717
718    use assert_matches::assert_matches;
719    use futures_util::FutureExt as _;
720    use matrix_sdk_base::{
721        RoomState,
722        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
723        sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
724    };
725    use matrix_sdk_test::{
726        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
727    };
728    use ruma::{event_id, room_id, serde::Raw, user_id};
729    use serde_json::json;
730    use tokio::time::sleep;
731
732    use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
733    use crate::test_utils::{
734        assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
735    };
736
737    #[async_test]
738    async fn test_must_explicitly_subscribe() {
739        let client = logged_in_client(None).await;
740
741        let event_cache = client.event_cache();
742
743        // If I create a room event subscriber for a room before subscribing the event
744        // cache,
745        let room_id = room_id!("!omelette:fromage.fr");
746        let result = event_cache.for_room(room_id).await;
747
748        // Then it fails, because one must explicitly call `.subscribe()` on the event
749        // cache.
750        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
751    }
752
753    #[async_test]
754    async fn test_uniq_read_marker() {
755        let client = logged_in_client(None).await;
756        let room_id = room_id!("!galette:saucisse.bzh");
757        client.base_client().get_or_create_room(room_id, RoomState::Joined);
758
759        let event_cache = client.event_cache();
760
761        event_cache.subscribe().unwrap();
762
763        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
764        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
765        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
766
767        assert!(events.is_empty());
768
769        // When sending multiple times the same read marker event,…
770        let read_marker_event = Raw::from_json_string(
771            json!({
772                "content": {
773                    "event_id": "$crepe:saucisse.bzh"
774                },
775                "room_id": "!galette:saucisse.bzh",
776                "type": "m.fully_read"
777            })
778            .to_string(),
779        )
780        .unwrap();
781        let account_data = vec![read_marker_event; 100];
782
783        room_event_cache
784            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
785            .await
786            .unwrap();
787
788        // … there's only one read marker update.
789        assert_matches!(
790            stream.recv().await.unwrap(),
791            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
792        );
793
794        assert!(stream.recv().now_or_never().is_none());
795
796        // None, because an account data doesn't trigger a generic update.
797        assert!(generic_stream.recv().now_or_never().is_none());
798    }
799
800    #[async_test]
801    async fn test_get_event_by_id() {
802        let client = logged_in_client(None).await;
803        let room_id1 = room_id!("!galette:saucisse.bzh");
804        let room_id2 = room_id!("!crepe:saucisse.bzh");
805
806        client.base_client().get_or_create_room(room_id1, RoomState::Joined);
807        client.base_client().get_or_create_room(room_id2, RoomState::Joined);
808
809        let event_cache = client.event_cache();
810        event_cache.subscribe().unwrap();
811
812        // Insert two rooms with a few events.
813        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
814
815        let eid1 = event_id!("$1");
816        let eid2 = event_id!("$2");
817        let eid3 = event_id!("$3");
818
819        let joined_room_update1 = JoinedRoomUpdate {
820            timeline: Timeline {
821                events: vec![
822                    f.text_msg("hey").event_id(eid1).into(),
823                    f.text_msg("you").event_id(eid2).into(),
824                ],
825                ..Default::default()
826            },
827            ..Default::default()
828        };
829
830        let joined_room_update2 = JoinedRoomUpdate {
831            timeline: Timeline {
832                events: vec![f.text_msg("bjr").event_id(eid3).into()],
833                ..Default::default()
834            },
835            ..Default::default()
836        };
837
838        let mut updates = RoomUpdates::default();
839        updates.joined.insert(room_id1.to_owned(), joined_room_update1);
840        updates.joined.insert(room_id2.to_owned(), joined_room_update2);
841
842        // Have the event cache handle them.
843        event_cache.inner.handle_room_updates(updates).await.unwrap();
844
845        // We can find the events in a single room.
846        let room1 = client.get_room(room_id1).unwrap();
847
848        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
849
850        let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
851        assert_event_matches_msg(&found1, "hey");
852
853        let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
854        assert_event_matches_msg(&found2, "you");
855
856        // Retrieving the event with id3 from the room which doesn't contain it will
857        // fail…
858        assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
859    }
860
861    #[async_test]
862    async fn test_save_event() {
863        let client = logged_in_client(None).await;
864        let room_id = room_id!("!galette:saucisse.bzh");
865
866        let event_cache = client.event_cache();
867        event_cache.subscribe().unwrap();
868
869        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
870        let event_id = event_id!("$1");
871
872        client.base_client().get_or_create_room(room_id, RoomState::Joined);
873        let room = client.get_room(room_id).unwrap();
874
875        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
876        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
877
878        // Retrieving the event at the room-wide cache works.
879        assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
880    }
881
882    #[async_test]
883    async fn test_generic_update_when_loading_rooms() {
884        // Create 2 rooms. One of them has data in the event cache storage.
885        let user = user_id!("@mnt_io:matrix.org");
886        let client = logged_in_client(None).await;
887        let room_id_0 = room_id!("!raclette:patate.ch");
888        let room_id_1 = room_id!("!fondue:patate.ch");
889
890        let event_factory = EventFactory::new().room(room_id_0).sender(user);
891
892        let event_cache = client.event_cache();
893        event_cache.subscribe().unwrap();
894
895        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
896        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
897
898        client
899            .event_cache_store()
900            .lock()
901            .await
902            .expect("Could not acquire the event cache lock")
903            .as_clean()
904            .expect("Could not acquire a clean event cache lock")
905            .handle_linked_chunk_updates(
906                LinkedChunkId::Room(room_id_0),
907                vec![
908                    // Non-empty items chunk.
909                    Update::NewItemsChunk {
910                        previous: None,
911                        new: ChunkIdentifier::new(0),
912                        next: None,
913                    },
914                    Update::PushItems {
915                        at: Position::new(ChunkIdentifier::new(0), 0),
916                        items: vec![
917                            event_factory
918                                .text_msg("hello")
919                                .sender(user)
920                                .event_id(event_id!("$ev0"))
921                                .into_event(),
922                        ],
923                    },
924                ],
925            )
926            .await
927            .unwrap();
928
929        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
930
931        // Room 0 has initial data, so it must trigger a generic update.
932        {
933            let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
934
935            assert_matches!(
936                generic_stream.recv().await,
937                Ok(RoomEventCacheGenericUpdate { room_id }) => {
938                    assert_eq!(room_id, room_id_0);
939                }
940            );
941        }
942
943        // Room 1 has NO initial data, so nothing should happen.
944        {
945            let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
946
947            assert!(generic_stream.recv().now_or_never().is_none());
948        }
949    }
950
951    #[async_test]
952    async fn test_generic_update_when_paginating_room() {
953        // Create 1 room, with 4 chunks in the event cache storage.
954        let user = user_id!("@mnt_io:matrix.org");
955        let client = logged_in_client(None).await;
956        let room_id = room_id!("!raclette:patate.ch");
957
958        let event_factory = EventFactory::new().room(room_id).sender(user);
959
960        let event_cache = client.event_cache();
961        event_cache.subscribe().unwrap();
962
963        client.base_client().get_or_create_room(room_id, RoomState::Joined);
964
965        client
966            .event_cache_store()
967            .lock()
968            .await
969            .expect("Could not acquire the event cache lock")
970            .as_clean()
971            .expect("Could not acquire a clean event cache lock")
972            .handle_linked_chunk_updates(
973                LinkedChunkId::Room(room_id),
974                vec![
975                    // Empty chunk.
976                    Update::NewItemsChunk {
977                        previous: None,
978                        new: ChunkIdentifier::new(0),
979                        next: None,
980                    },
981                    // Empty chunk.
982                    Update::NewItemsChunk {
983                        previous: Some(ChunkIdentifier::new(0)),
984                        new: ChunkIdentifier::new(1),
985                        next: None,
986                    },
987                    // Non-empty items chunk.
988                    Update::NewItemsChunk {
989                        previous: Some(ChunkIdentifier::new(1)),
990                        new: ChunkIdentifier::new(2),
991                        next: None,
992                    },
993                    Update::PushItems {
994                        at: Position::new(ChunkIdentifier::new(2), 0),
995                        items: vec![
996                            event_factory
997                                .text_msg("hello")
998                                .sender(user)
999                                .event_id(event_id!("$ev0"))
1000                                .into_event(),
1001                        ],
1002                    },
1003                    // Non-empty items chunk.
1004                    Update::NewItemsChunk {
1005                        previous: Some(ChunkIdentifier::new(2)),
1006                        new: ChunkIdentifier::new(3),
1007                        next: None,
1008                    },
1009                    Update::PushItems {
1010                        at: Position::new(ChunkIdentifier::new(3), 0),
1011                        items: vec![
1012                            event_factory
1013                                .text_msg("world")
1014                                .sender(user)
1015                                .event_id(event_id!("$ev1"))
1016                                .into_event(),
1017                        ],
1018                    },
1019                ],
1020            )
1021            .await
1022            .unwrap();
1023
1024        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1025
1026        // Room is initialised, it gets one event in the timeline.
1027        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1028
1029        assert_matches!(
1030            generic_stream.recv().await,
1031            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1032                assert_eq!(room_id, expected_room_id);
1033            }
1034        );
1035
1036        let pagination = room_event_cache.pagination();
1037
1038        // Paginate, it gets one new event in the timeline.
1039        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1040
1041        assert_eq!(pagination_outcome.events.len(), 1);
1042        assert!(pagination_outcome.reached_start.not());
1043        assert_matches!(
1044            generic_stream.recv().await,
1045            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1046                assert_eq!(room_id, expected_room_id);
1047            }
1048        );
1049
1050        // Paginate, it gets zero new event in the timeline.
1051        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1052
1053        assert!(pagination_outcome.events.is_empty());
1054        assert!(pagination_outcome.reached_start.not());
1055        assert!(generic_stream.recv().now_or_never().is_none());
1056
1057        // Paginate once more. Just checking our scenario is correct.
1058        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1059
1060        assert!(pagination_outcome.reached_start);
1061        assert!(generic_stream.recv().now_or_never().is_none());
1062    }
1063
1064    #[async_test]
1065    async fn test_for_room_when_room_is_not_found() {
1066        let client = logged_in_client(None).await;
1067        let room_id = room_id!("!raclette:patate.ch");
1068
1069        let event_cache = client.event_cache();
1070        event_cache.subscribe().unwrap();
1071
1072        // Room doesn't exist. It returns an error.
1073        assert_matches!(
1074            event_cache.for_room(room_id).await,
1075            Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1076                assert_eq!(room_id, not_found_room_id);
1077            }
1078        );
1079
1080        // Now create the room.
1081        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1082
1083        // Room exists. Everything fine.
1084        assert!(event_cache.for_room(room_id).await.is_ok());
1085    }
1086
1087    /// Test that the event cache does not create reference cycles or tasks that
1088    /// retain its reference indefinitely, preventing it from being deallocated.
1089    #[cfg(not(target_family = "wasm"))]
1090    #[async_test]
1091    async fn test_no_refcycle_event_cache_tasks() {
1092        let client = MockClientBuilder::new(None).build().await;
1093
1094        // Wait for the init tasks to die.
1095        sleep(Duration::from_secs(1)).await;
1096
1097        let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1098        assert_eq!(event_cache_weak.strong_count(), 1);
1099
1100        {
1101            let room_id = room_id!("!room:example.org");
1102
1103            // Have the client know the room.
1104            let response = SyncResponseBuilder::default()
1105                .add_joined_room(JoinedRoomBuilder::new(room_id))
1106                .build_sync_response();
1107            client.inner.base_client.receive_sync_response(response).await.unwrap();
1108
1109            client.event_cache().subscribe().unwrap();
1110
1111            let (_room_event_cache, _drop_handles) =
1112                client.get_room(room_id).unwrap().event_cache().await.unwrap();
1113        }
1114
1115        drop(client);
1116
1117        // Give a bit of time for background tasks to die.
1118        sleep(Duration::from_secs(1)).await;
1119
1120        // No strong counts should exist now that the Client has been dropped.
1121        assert_eq!(
1122            event_cache_weak.strong_count(),
1123            0,
1124            "Too many strong references to the event cache {}",
1125            event_cache_weak.strong_count()
1126        );
1127    }
1128}