matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::{BTreeMap, HashMap},
32    fmt,
33    sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40    ThreadingSupport,
41    cross_process_lock::CrossProcessLockError,
42    deserialized_responses::{AmbiguityChange, TimelineEvent},
43    event_cache::{
44        Gap,
45        store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
46    },
47    executor::AbortOnDrop,
48    linked_chunk::{self, OwnedLinkedChunkId, lazy_loader::LazyLoaderError},
49    serde_helpers::extract_thread_root_from_content,
50    sync::RoomUpdates,
51    timer,
52};
53use matrix_sdk_common::executor::{JoinHandle, spawn};
54use ruma::{
55    OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, events::AnySyncEphemeralRoomEvent,
56    serde::Raw,
57};
58use tokio::{
59    select,
60    sync::{
61        Mutex, RwLock,
62        broadcast::{Receiver, Sender, channel, error::RecvError},
63        mpsc,
64    },
65};
66use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
67
68use crate::{
69    Client,
70    client::WeakClient,
71    event_cache::room::RoomEventCacheStateLock,
72    send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
73};
74
75mod deduplicator;
76mod pagination;
77#[cfg(feature = "e2e-encryption")]
78mod redecryptor;
79mod room;
80
81pub use pagination::{RoomPagination, RoomPaginationStatus};
82#[cfg(feature = "e2e-encryption")]
83pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
84pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
85
86/// An error observed in the [`EventCache`].
87#[derive(thiserror::Error, Debug)]
88pub enum EventCacheError {
89    /// The [`EventCache`] instance hasn't been initialized with
90    /// [`EventCache::subscribe`]
91    #[error(
92        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
93    )]
94    NotSubscribedYet,
95
96    /// Room is not found.
97    #[error("Room `{room_id}` is not found.")]
98    RoomNotFound {
99        /// The ID of the room not being found.
100        room_id: OwnedRoomId,
101    },
102
103    /// An error has been observed while back-paginating.
104    #[error(transparent)]
105    BackpaginationError(Box<crate::Error>),
106
107    /// Back-pagination was already happening in a given room, where we tried to
108    /// back-paginate again.
109    #[error("We were already back-paginating.")]
110    AlreadyBackpaginating,
111
112    /// An error happening when interacting with storage.
113    #[error(transparent)]
114    Storage(#[from] EventCacheStoreError),
115
116    /// An error happening when attempting to (cross-process) lock storage.
117    #[error(transparent)]
118    LockingStorage(#[from] CrossProcessLockError),
119
120    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
121    /// to. It's possible this weak reference points to nothing anymore, at
122    /// times where we try to use the client.
123    #[error("The owning client of the event cache has been dropped.")]
124    ClientDropped,
125
126    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
127    /// loader.
128    ///
129    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
130    #[error(transparent)]
131    LinkedChunkLoader(#[from] LazyLoaderError),
132
133    /// An error happened when reading the metadata of a linked chunk, upon
134    /// reload.
135    #[error("the linked chunk metadata is invalid: {details}")]
136    InvalidLinkedChunkMetadata {
137        /// A string containing details about the error.
138        details: String,
139    },
140}
141
142/// A result using the [`EventCacheError`].
143pub type Result<T> = std::result::Result<T, EventCacheError>;
144
145/// Hold handles to the tasks spawn by a [`EventCache`].
146pub struct EventCacheDropHandles {
147    /// Task that listens to room updates.
148    listen_updates_task: JoinHandle<()>,
149
150    /// Task that listens to updates to the user's ignored list.
151    ignore_user_list_update_task: JoinHandle<()>,
152
153    /// The task used to automatically shrink the linked chunks.
154    auto_shrink_linked_chunk_task: JoinHandle<()>,
155
156    /// The task used to automatically redecrypt UTDs.
157    #[cfg(feature = "e2e-encryption")]
158    _redecryptor: redecryptor::Redecryptor,
159}
160
161impl fmt::Debug for EventCacheDropHandles {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
164    }
165}
166
167impl Drop for EventCacheDropHandles {
168    fn drop(&mut self) {
169        self.listen_updates_task.abort();
170        self.ignore_user_list_update_task.abort();
171        self.auto_shrink_linked_chunk_task.abort();
172    }
173}
174
175/// An event cache, providing lots of useful functionality for clients.
176///
177/// Cloning is shallow, and thus is cheap to do.
178///
179/// See also the module-level comment.
180#[derive(Clone)]
181pub struct EventCache {
182    /// Reference to the inner cache.
183    inner: Arc<EventCacheInner>,
184}
185
186impl fmt::Debug for EventCache {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        f.debug_struct("EventCache").finish_non_exhaustive()
189    }
190}
191
192impl EventCache {
193    /// Create a new [`EventCache`] for the given client.
194    pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
195        let (generic_update_sender, _) = channel(32);
196        let (linked_chunk_update_sender, _) = channel(32);
197
198        let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
199        let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
200            client.clone(),
201            linked_chunk_update_sender.clone(),
202            thread_subscriber_sender,
203        )));
204
205        #[cfg(feature = "experimental-search")]
206        let search_indexing_task = AbortOnDrop::new(spawn(Self::search_indexing_task(
207            client.clone(),
208            linked_chunk_update_sender.clone(),
209        )));
210
211        #[cfg(feature = "e2e-encryption")]
212        let redecryption_channels = redecryptor::RedecryptorChannels::new();
213
214        Self {
215            inner: Arc::new(EventCacheInner {
216                client,
217                store: event_cache_store,
218                multiple_room_updates_lock: Default::default(),
219                by_room: Default::default(),
220                drop_handles: Default::default(),
221                auto_shrink_sender: Default::default(),
222                generic_update_sender,
223                linked_chunk_update_sender,
224                _thread_subscriber_task: thread_subscriber_task,
225                #[cfg(feature = "experimental-search")]
226                _search_indexing_task: search_indexing_task,
227                #[cfg(feature = "e2e-encryption")]
228                redecryption_channels,
229                thread_subscriber_receiver,
230            }),
231        }
232    }
233
234    /// Subscribes to updates that a thread subscription has been sent.
235    ///
236    /// For testing purposes only.
237    #[doc(hidden)]
238    pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
239        self.inner.thread_subscriber_receiver.resubscribe()
240    }
241
242    /// Starts subscribing the [`EventCache`] to sync responses, if not done
243    /// before.
244    ///
245    /// Re-running this has no effect if we already subscribed before, and is
246    /// cheap.
247    pub fn subscribe(&self) -> Result<()> {
248        let client = self.inner.client()?;
249
250        // Initialize the drop handles.
251        let _ = self.inner.drop_handles.get_or_init(|| {
252            // Spawn the task that will listen to all the room updates at once.
253            let listen_updates_task = spawn(Self::listen_task(
254                self.inner.clone(),
255                client.subscribe_to_all_room_updates(),
256            ));
257
258            let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
259                self.inner.clone(),
260                client.subscribe_to_ignore_user_list_changes(),
261            ));
262
263            let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
264
265            // Force-initialize the sender in the [`RoomEventCacheInner`].
266            self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
267
268            let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task(
269                Arc::downgrade(&self.inner),
270                auto_shrink_receiver,
271            ));
272
273            #[cfg(feature = "e2e-encryption")]
274            let redecryptor = {
275                let receiver = self
276                    .inner
277                    .redecryption_channels
278                    .decryption_request_receiver
279                    .lock()
280                    .take()
281                    .expect("We should have initialized the channel an subscribing should happen only once");
282
283                redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
284            };
285
286
287            Arc::new(EventCacheDropHandles {
288                listen_updates_task,
289                ignore_user_list_update_task,
290                auto_shrink_linked_chunk_task,
291                #[cfg(feature = "e2e-encryption")]
292                _redecryptor: redecryptor,
293            })
294        });
295
296        Ok(())
297    }
298
299    #[instrument(skip_all)]
300    async fn ignore_user_list_update_task(
301        inner: Arc<EventCacheInner>,
302        mut ignore_user_list_stream: Subscriber<Vec<String>>,
303    ) {
304        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
305        span.follows_from(Span::current());
306
307        async move {
308            while ignore_user_list_stream.next().await.is_some() {
309                info!("Received an ignore user list change");
310                if let Err(err) = inner.clear_all_rooms().await {
311                    error!("when clearing room storage after ignore user list change: {err}");
312                }
313            }
314            info!("Ignore user list stream has closed");
315        }
316        .instrument(span)
317        .await;
318    }
319
320    /// For benchmarking purposes only.
321    #[doc(hidden)]
322    pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
323        self.inner.handle_room_updates(updates).await
324    }
325
326    #[instrument(skip_all)]
327    async fn listen_task(
328        inner: Arc<EventCacheInner>,
329        mut room_updates_feed: Receiver<RoomUpdates>,
330    ) {
331        trace!("Spawning the listen task");
332        loop {
333            match room_updates_feed.recv().await {
334                Ok(updates) => {
335                    trace!("Receiving `RoomUpdates`");
336
337                    if let Err(err) = inner.handle_room_updates(updates).await {
338                        match err {
339                            EventCacheError::ClientDropped => {
340                                // The client has dropped, exit the listen task.
341                                info!(
342                                    "Closing the event cache global listen task because client dropped"
343                                );
344                                break;
345                            }
346                            err => {
347                                error!("Error when handling room updates: {err}");
348                            }
349                        }
350                    }
351                }
352
353                Err(RecvError::Lagged(num_skipped)) => {
354                    // Forget everything we know; we could have missed events, and we have
355                    // no way to reconcile at the moment!
356                    // TODO: implement Smart Matching™,
357                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
358                    if let Err(err) = inner.clear_all_rooms().await {
359                        error!("when clearing storage after lag in listen_task: {err}");
360                    }
361                }
362
363                Err(RecvError::Closed) => {
364                    // The sender has shut down, exit.
365                    info!("Closing the event cache global listen task because receiver closed");
366                    break;
367                }
368            }
369        }
370    }
371
372    /// Spawns the task that will listen to auto-shrink notifications.
373    ///
374    /// The auto-shrink mechanism works this way:
375    ///
376    /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
377    ///   increment the active number of subscribers to that room, aka
378    ///   [`RoomEventCacheState::subscriber_count`].
379    /// - When that subscriber is dropped, it will decrement that count; and
380    ///   notify the task below if it reached 0.
381    /// - The task spawned here, owned by the [`EventCacheInner`], will listen
382    ///   to such notifications that a room may be shrunk. It will attempt an
383    ///   auto-shrink, by letting the inner state decide whether this is a good
384    ///   time to do so (new subscribers might have spawned in the meanwhile).
385    #[instrument(skip_all)]
386    async fn auto_shrink_linked_chunk_task(
387        inner: Weak<EventCacheInner>,
388        mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
389    ) {
390        while let Some(room_id) = rx.recv().await {
391            trace!(for_room = %room_id, "received notification to shrink");
392
393            let Some(inner) = inner.upgrade() else {
394                return;
395            };
396
397            let room = match inner.for_room(&room_id).await {
398                Ok(room) => room,
399                Err(err) => {
400                    warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
401                    continue;
402                }
403            };
404
405            trace!("waiting for state lock…");
406            let mut state = match room.inner.state.write().await {
407                Ok(state) => state,
408                Err(err) => {
409                    warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}");
410                    continue;
411                }
412            };
413
414            match state.auto_shrink_if_no_subscribers().await {
415                Ok(diffs) => {
416                    if let Some(diffs) = diffs {
417                        // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
418                        // subscribers, right? RIGHT? Especially because the state is guarded behind
419                        // a lock.
420                        //
421                        // However, better safe than sorry, and it's cheap to send an update here,
422                        // so let's do it!
423                        if !diffs.is_empty() {
424                            let _ = room.inner.update_sender.send(
425                                RoomEventCacheUpdate::UpdateTimelineEvents {
426                                    diffs,
427                                    origin: EventsOrigin::Cache,
428                                },
429                            );
430                        }
431                    } else {
432                        debug!("auto-shrinking didn't happen");
433                    }
434                }
435
436                Err(err) => {
437                    // There's not much we can do here, unfortunately.
438                    warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
439                }
440            }
441        }
442
443        info!("Auto-shrink linked chunk task has been closed, exiting");
444    }
445
446    /// Check whether [`EventCache::subscribe`] has been called.
447    pub fn has_subscribed(&self) -> bool {
448        self.inner.drop_handles.get().is_some()
449    }
450
451    /// Return a room-specific view over the [`EventCache`].
452    pub(crate) async fn for_room(
453        &self,
454        room_id: &RoomId,
455    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
456        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
457            return Err(EventCacheError::NotSubscribedYet);
458        };
459
460        let room = self.inner.for_room(room_id).await?;
461
462        Ok((room, drop_handles))
463    }
464
465    /// Cleanly clear all the rooms' event caches.
466    ///
467    /// This will notify any live observers that the room has been cleared.
468    pub async fn clear_all_rooms(&self) -> Result<()> {
469        self.inner.clear_all_rooms().await
470    }
471
472    /// Subscribe to room _generic_ updates.
473    ///
474    /// If one wants to listen what has changed in a specific room, the
475    /// [`RoomEventCache::subscribe`] is recommended. However, the
476    /// [`RoomEventCacheSubscriber`] type triggers side-effects.
477    ///
478    /// If one wants to get a high-overview, generic, updates for rooms, and
479    /// without side-effects, this method is recommended. Also, dropping the
480    /// receiver of this channel will not trigger any side-effect.
481    pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
482        self.inner.generic_update_sender.subscribe()
483    }
484
485    /// React to a given linked chunk update by subscribing the user to a
486    /// thread, if needs be (when the user got mentioned in a thread reply, for
487    /// a thread they were not subscribed to).
488    ///
489    /// Returns a boolean indicating whether the task should keep on running or
490    /// not.
491    #[instrument(skip(client, thread_subscriber_sender))]
492    async fn handle_thread_subscriber_linked_chunk_update(
493        client: &WeakClient,
494        thread_subscriber_sender: &Sender<()>,
495        up: RoomEventCacheLinkedChunkUpdate,
496    ) -> bool {
497        let Some(client) = client.get() else {
498            // Client shutting down.
499            debug!("Client is shutting down, exiting thread subscriber task");
500            return false;
501        };
502
503        let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
504            trace!("received an update for a non-thread linked chunk, ignoring");
505            return true;
506        };
507
508        let Some(room) = client.get_room(room_id) else {
509            warn!(%room_id, "unknown room");
510            return true;
511        };
512
513        let thread_root = thread_root.clone();
514
515        let mut new_events = up.events().peekable();
516
517        if new_events.peek().is_none() {
518            // No new events, nothing to do.
519            return true;
520        }
521
522        // This `PushContext` is going to be used to compute whether an in-thread event
523        // would trigger a mention.
524        //
525        // Of course, we're not interested in an in-thread event causing a mention,
526        // because it's part of a thread we've subscribed to. So the
527        // `PushContext` must not include the check for thread subscriptions (otherwise
528        // it would be impossible to subscribe to new threads).
529
530        let with_thread_subscriptions = false;
531
532        let Some(push_context) = room
533            .push_context_internal(with_thread_subscriptions)
534            .await
535            .inspect_err(|err| {
536                warn!("Failed to get push context for threads: {err}");
537            })
538            .ok()
539            .flatten()
540        else {
541            warn!("Missing push context for thread subscriptions.");
542            return true;
543        };
544
545        let mut subscribe_up_to = None;
546
547        // Find if there's an event that would trigger a mention for the current
548        // user, iterating from the end of the new events towards the oldest, so we can
549        // find the most recent event to subscribe to.
550        for ev in new_events.rev() {
551            if push_context
552                .for_event(ev.raw())
553                .await
554                .into_iter()
555                .any(|action| action.should_notify())
556            {
557                let Some(event_id) = ev.event_id() else {
558                    // Shouldn't happen.
559                    continue;
560                };
561                subscribe_up_to = Some(event_id);
562                break;
563            }
564        }
565
566        // And if we've found such a mention, subscribe to the thread up to this
567        // event.
568        if let Some(event_id) = subscribe_up_to {
569            trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
570            if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
571                warn!(%err, "Failed to subscribe to thread");
572            } else {
573                let _ = thread_subscriber_sender.send(());
574            }
575        }
576
577        true
578    }
579
580    /// React to a given send queue update by subscribing the user to a
581    /// thread, if needs be (when the user sent an event in a thread they were
582    /// not subscribed to).
583    ///
584    /// Returns a boolean indicating whether the task should keep on running or
585    /// not.
586    #[instrument(skip(client, thread_subscriber_sender))]
587    async fn handle_thread_subscriber_send_queue_update(
588        client: &WeakClient,
589        thread_subscriber_sender: &Sender<()>,
590        events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
591        up: SendQueueUpdate,
592    ) -> bool {
593        let Some(client) = client.get() else {
594            // Client shutting down.
595            debug!("Client is shutting down, exiting thread subscriber task");
596            return false;
597        };
598
599        let room_id = up.room_id;
600        let Some(room) = client.get_room(&room_id) else {
601            warn!(%room_id, "unknown room");
602            return true;
603        };
604
605        let (thread_root, subscribe_up_to) = match up.update {
606            RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
607                match local_echo.content {
608                    LocalEchoContent::Event { serialized_event, .. } => {
609                        if let Some(thread_root) =
610                            extract_thread_root_from_content(serialized_event.into_raw().0)
611                        {
612                            events_being_sent.insert(local_echo.transaction_id, thread_root);
613                        }
614                    }
615                    LocalEchoContent::React { .. } => {
616                        // Nothing to do, reactions don't count as a thread
617                        // subscription.
618                    }
619                }
620                return true;
621            }
622
623            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
624                events_being_sent.remove(&transaction_id);
625                return true;
626            }
627
628            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
629                if let Some(thread_root) =
630                    extract_thread_root_from_content(new_content.into_raw().0)
631                {
632                    events_being_sent.insert(transaction_id, thread_root);
633                } else {
634                    // It could be that the event isn't part of a thread anymore; handle that by
635                    // removing the pending transaction id.
636                    events_being_sent.remove(&transaction_id);
637                }
638                return true;
639            }
640
641            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
642                if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
643                    (thread_root, event_id)
644                } else {
645                    // We don't know about the event that has been sent, so ignore it.
646                    trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
647                    return true;
648                }
649            }
650
651            RoomSendQueueUpdate::SendError { .. }
652            | RoomSendQueueUpdate::RetryEvent { .. }
653            | RoomSendQueueUpdate::MediaUpload { .. } => {
654                // Nothing to do for these bad boys.
655                return true;
656            }
657        };
658
659        // And if we've found such a mention, subscribe to the thread up to this event.
660        trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
661        if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
662        {
663            warn!(%err, "Failed to subscribe to thread");
664        } else {
665            let _ = thread_subscriber_sender.send(());
666        }
667
668        true
669    }
670
671    #[instrument(skip_all)]
672    async fn thread_subscriber_task(
673        client: WeakClient,
674        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
675        thread_subscriber_sender: Sender<()>,
676    ) {
677        let mut send_q_rx = if let Some(client) = client.get() {
678            if !client.enabled_thread_subscriptions() {
679                trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
680                return;
681            }
682
683            client.send_queue().subscribe()
684        } else {
685            trace!("Client is shutting down, not spawning thread subscriber task");
686            return;
687        };
688
689        let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
690
691        // A mapping of local echoes (events being sent), to their thread root, if
692        // they're in an in-thread reply.
693        //
694        // Entirely managed by `handle_thread_subscriber_send_queue_update`.
695        let mut events_being_sent = HashMap::new();
696
697        loop {
698            select! {
699                res = send_q_rx.recv() => {
700                    match res {
701                        Ok(up) => {
702                            if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
703                                break;
704                            }
705                        }
706                        Err(RecvError::Closed) => {
707                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
708                            break;
709                        }
710                        Err(RecvError::Lagged(num_skipped)) => {
711                            warn!(num_skipped, "Lagged behind linked chunk updates");
712                        }
713                    }
714                }
715
716                res = linked_chunk_rx.recv() => {
717                    match res {
718                        Ok(up) => {
719                            if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
720                                break;
721                            }
722                        }
723                        Err(RecvError::Closed) => {
724                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
725                            break;
726                        }
727                        Err(RecvError::Lagged(num_skipped)) => {
728                            warn!(num_skipped, "Lagged behind linked chunk updates");
729                        }
730                    }
731                }
732            }
733        }
734    }
735
736    /// Takes a [`TimelineEvent`] and passes it to the [`RoomIndex`] of the
737    /// given room which will add/remove/edit an event in the index based on
738    /// the event type.
739    #[cfg(feature = "experimental-search")]
740    #[instrument(skip_all)]
741    async fn search_indexing_task(
742        client: WeakClient,
743        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
744    ) {
745        let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
746
747        loop {
748            match linked_chunk_update_receiver.recv().await {
749                Ok(room_ec_lc_update) => {
750                    let OwnedLinkedChunkId::Room(room_id) =
751                        room_ec_lc_update.linked_chunk_id.clone()
752                    else {
753                        trace!("Received non-room updates, ignoring.");
754                        continue;
755                    };
756
757                    let mut timeline_events = room_ec_lc_update.events().peekable();
758
759                    if timeline_events.peek().is_none() {
760                        continue;
761                    }
762
763                    let Some(client) = client.get() else {
764                        trace!("Client is shutting down, not spawning thread subscriber task");
765                        return;
766                    };
767
768                    let maybe_room_cache = client.event_cache().for_room(&room_id).await;
769                    let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
770                        warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
771                        continue;
772                    };
773
774                    let maybe_room = client.get_room(&room_id);
775                    let Some(room) = maybe_room else {
776                        warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
777                        continue;
778                    };
779                    let redaction_rules =
780                        room.clone_info().room_version_rules_or_default().redaction;
781
782                    let mut search_index_guard = client.search_index().lock().await;
783
784                    if let Err(err) = search_index_guard
785                        .bulk_handle_timeline_event(
786                            timeline_events,
787                            &room_cache,
788                            &room_id,
789                            &redaction_rules,
790                        )
791                        .await
792                    {
793                        error!("Failed to handle events for indexing: {err}")
794                    }
795                }
796                Err(RecvError::Closed) => {
797                    debug!(
798                        "Linked chunk update channel has been closed, exiting thread subscriber task"
799                    );
800                    break;
801                }
802                Err(RecvError::Lagged(num_skipped)) => {
803                    warn!(num_skipped, "Lagged behind linked chunk updates");
804                }
805            }
806        }
807    }
808}
809
810struct EventCacheInner {
811    /// A weak reference to the inner client, useful when trying to get a handle
812    /// on the owning client.
813    client: WeakClient,
814
815    /// Reference to the underlying store.
816    store: EventCacheStoreLock,
817
818    /// A lock used when many rooms must be updated at once.
819    ///
820    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
821    /// ensure that multiple updates will be applied in the correct order, which
822    /// is enforced by taking this lock when handling an update.
823    // TODO: that's the place to add a cross-process lock!
824    multiple_room_updates_lock: Mutex<()>,
825
826    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
827    by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
828
829    /// Handles to keep alive the task listening to updates.
830    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
831
832    /// A sender for notifications that a room *may* need to be auto-shrunk.
833    ///
834    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
835    /// instance.
836    ///
837    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
838    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
839
840    /// A sender for room generic update.
841    ///
842    /// See doc comment of [`RoomEventCacheGenericUpdate`] and
843    /// [`EventCache::subscribe_to_room_generic_updates`].
844    generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
845
846    /// A sender for a persisted linked chunk update.
847    ///
848    /// This is used to notify that some linked chunk has persisted some updates
849    /// to a store, during sync or a back-pagination of *any* linked chunk.
850    /// This can be used by observers to look for new events.
851    ///
852    /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
853    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
854
855    /// A background task listening to room and send queue updates, and
856    /// automatically subscribing the user to threads when needed, based on
857    /// the semantics of MSC4306.
858    ///
859    /// One important constraint is that there is only one such task per
860    /// [`EventCache`], so it does listen to *all* rooms at the same time.
861    _thread_subscriber_task: AbortOnDrop<()>,
862
863    /// A background task listening to room updates, and
864    /// automatically handling search index operations add/remove/edit
865    /// depending on the event type.
866    ///
867    /// One important constraint is that there is only one such task per
868    /// [`EventCache`], so it does listen to *all* rooms at the same time.
869    #[cfg(feature = "experimental-search")]
870    _search_indexing_task: AbortOnDrop<()>,
871
872    /// A test helper receiver that will be emitted every time the thread
873    /// subscriber task subscribed to a new thread.
874    ///
875    /// This is helpful for tests to coordinate that a new thread subscription
876    /// has been sent or not.
877    thread_subscriber_receiver: Receiver<()>,
878
879    #[cfg(feature = "e2e-encryption")]
880    redecryption_channels: redecryptor::RedecryptorChannels,
881}
882
883type AutoShrinkChannelPayload = OwnedRoomId;
884
885impl EventCacheInner {
886    fn client(&self) -> Result<Client> {
887        self.client.get().ok_or(EventCacheError::ClientDropped)
888    }
889
890    /// Clears all the room's data.
891    async fn clear_all_rooms(&self) -> Result<()> {
892        // Okay, here's where things get complicated.
893        //
894        // On the one hand, `by_room` may include storage for *some* rooms that we know
895        // about, but not *all* of them. Any room that hasn't been loaded in the
896        // client, or touched by a sync, will remain unloaded in memory, so it
897        // will be missing from `self.by_room`. As a result, we need to make
898        // sure that we're hitting the storage backend to *really* clear all the
899        // rooms, including those that haven't been loaded yet.
900        //
901        // On the other hand, one must NOT clear the `by_room` map, because if someone
902        // subscribed to a room update, they would never get any new update for
903        // that room, since re-creating the `RoomEventCache` would create a new,
904        // unrelated sender.
905        //
906        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
907        // store backend.
908        //
909        // As a result, for a short while, the in-memory linked chunks
910        // will be desynchronized from the storage. We need to be careful then. During
911        // that short while, we don't want *anyone* to touch the linked chunk
912        // (be it in memory or in the storage).
913        //
914        // And since that requirement applies to *any* room in `by_room` at the same
915        // time, we'll have to take the locks for *all* the live rooms, so as to
916        // properly clear the underlying storage.
917        //
918        // At this point, you might be scared about the potential for deadlocking. I am
919        // as well, but I'm convinced we're fine:
920        // 1. the lock for `by_room` is usually held only for a short while, and
921        //    independently of the other two kinds.
922        // 2. the state may acquire the store cross-process lock internally, but only
923        //    while the state's methods are called (so it's always transient). As a
924        //    result, as soon as we've acquired the state locks, the store lock ought to
925        //    be free.
926        // 3. The store lock is held explicitly only in a small scoped area below.
927        // 4. Then the store lock will be held internally when calling `reset()`, but at
928        //    this point it's only held for a short while each time, so rooms will take
929        //    turn to acquire it.
930
931        let rooms = self.by_room.write().await;
932
933        // Collect all the rooms' state locks, first: we can clear the storage only when
934        // nobody will touch it at the same time.
935        let room_locks = join_all(
936            rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
937        )
938        .await;
939
940        // Clear the storage for all the rooms, using the storage facility.
941        let store_guard = match self.store.lock().await? {
942            EventCacheStoreLockState::Clean(store_guard) => store_guard,
943            EventCacheStoreLockState::Dirty(store_guard) => store_guard,
944        };
945        store_guard.clear_all_linked_chunks().await?;
946
947        // At this point, all the in-memory linked chunks are desynchronized from the
948        // storage. Resynchronize them manually by calling reset(), and
949        // propagate updates to observers.
950        try_join_all(room_locks.into_iter().map(|(room, state_guard)| async move {
951            let mut state_guard = state_guard?;
952            let updates_as_vector_diffs = state_guard.reset().await?;
953
954            let _ = room.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
955                diffs: updates_as_vector_diffs,
956                origin: EventsOrigin::Cache,
957            });
958
959            let _ = room
960                .inner
961                .generic_update_sender
962                .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
963
964            Ok::<_, EventCacheError>(())
965        }))
966        .await?;
967
968        Ok(())
969    }
970
971    /// Handles a single set of room updates at once.
972    #[instrument(skip(self, updates))]
973    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
974        // First, take the lock that indicates we're processing updates, to avoid
975        // handling multiple updates concurrently.
976        let _lock = {
977            let _timer = timer!("Taking the `multiple_room_updates_lock`");
978            self.multiple_room_updates_lock.lock().await
979        };
980
981        // NOTE: bnjbvr tried to make this concurrent at some point, but it turned out
982        // to be a performance regression, even for large sync updates. Lacking
983        // time to investigate, this code remains sequential for now. See also
984        // https://github.com/matrix-org/matrix-rust-sdk/pull/5426.
985
986        // Left rooms.
987        for (room_id, left_room_update) in updates.left {
988            let room = self.for_room(&room_id).await?;
989
990            if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
991                // Non-fatal error, try to continue to the next room.
992                error!("handling left room update: {err}");
993            }
994        }
995
996        // Joined rooms.
997        for (room_id, joined_room_update) in updates.joined {
998            trace!(?room_id, "Handling a `JoinedRoomUpdate`");
999
1000            let room = self.for_room(&room_id).await?;
1001
1002            if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
1003                // Non-fatal error, try to continue to the next room.
1004                error!(%room_id, "handling joined room update: {err}");
1005            }
1006        }
1007
1008        // Invited rooms.
1009        // TODO: we don't anything with `updates.invite` at this point.
1010
1011        Ok(())
1012    }
1013
1014    /// Return a room-specific view over the [`EventCache`].
1015    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
1016        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
1017        // write lock.
1018        let by_room_guard = self.by_room.read().await;
1019
1020        match by_room_guard.get(room_id) {
1021            Some(room) => Ok(room.clone()),
1022
1023            None => {
1024                // Slow-path: the entry doesn't exist; let's acquire a write lock.
1025                drop(by_room_guard);
1026                let mut by_room_guard = self.by_room.write().await;
1027
1028                // In the meanwhile, some other caller might have obtained write access and done
1029                // the same, so check for existence again.
1030                if let Some(room) = by_room_guard.get(room_id) {
1031                    return Ok(room.clone());
1032                }
1033
1034                let pagination_status =
1035                    SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
1036
1037                let Some(client) = self.client.get() else {
1038                    return Err(EventCacheError::ClientDropped);
1039                };
1040
1041                let room = client
1042                    .get_room(room_id)
1043                    .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
1044                let room_version_rules = room.clone_info().room_version_rules_or_default();
1045
1046                let enabled_thread_support = matches!(
1047                    client.base_client().threading_support,
1048                    ThreadingSupport::Enabled { .. }
1049                );
1050
1051                let update_sender = Sender::new(32);
1052
1053                let room_state = RoomEventCacheStateLock::new(
1054                    room_id.to_owned(),
1055                    room_version_rules,
1056                    enabled_thread_support,
1057                    update_sender.clone(),
1058                    self.generic_update_sender.clone(),
1059                    self.linked_chunk_update_sender.clone(),
1060                    self.store.clone(),
1061                    pagination_status.clone(),
1062                )
1063                .await?;
1064
1065                let timeline_is_not_empty =
1066                    room_state.read().await?.room_linked_chunk().revents().next().is_some();
1067
1068                // SAFETY: we must have subscribed before reaching this code, otherwise
1069                // something is very wrong.
1070                let auto_shrink_sender =
1071                    self.auto_shrink_sender.get().cloned().expect(
1072                        "we must have called `EventCache::subscribe()` before calling here.",
1073                    );
1074
1075                let room_event_cache = RoomEventCache::new(
1076                    self.client.clone(),
1077                    room_state,
1078                    pagination_status,
1079                    room_id.to_owned(),
1080                    auto_shrink_sender,
1081                    update_sender,
1082                    self.generic_update_sender.clone(),
1083                );
1084
1085                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1086
1087                // If at least one event has been loaded, it means there is a timeline. Let's
1088                // emit a generic update.
1089                if timeline_is_not_empty {
1090                    let _ = self
1091                        .generic_update_sender
1092                        .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1093                }
1094
1095                Ok(room_event_cache)
1096            }
1097        }
1098    }
1099}
1100
1101/// The result of a single back-pagination request.
1102#[derive(Debug)]
1103pub struct BackPaginationOutcome {
1104    /// Did the back-pagination reach the start of the timeline?
1105    pub reached_start: bool,
1106
1107    /// All the events that have been returned in the back-pagination
1108    /// request.
1109    ///
1110    /// Events are presented in reverse order: the first element of the vec,
1111    /// if present, is the most "recent" event from the chunk (or
1112    /// technically, the last one in the topological ordering).
1113    pub events: Vec<TimelineEvent>,
1114}
1115
1116/// Represents a timeline update of a room. It hides the details of
1117/// [`RoomEventCacheUpdate`] by being more generic.
1118///
1119/// This is used by [`EventCache::subscribe_to_room_generic_updates`]. Please
1120/// read it to learn more about the motivation behind this type.
1121#[derive(Clone, Debug)]
1122pub struct RoomEventCacheGenericUpdate {
1123    /// The room ID owning the timeline.
1124    pub room_id: OwnedRoomId,
1125}
1126
1127/// An update being triggered when events change in the persisted event cache
1128/// for any room.
1129#[derive(Clone, Debug)]
1130struct RoomEventCacheLinkedChunkUpdate {
1131    /// The linked chunk affected by the update.
1132    linked_chunk_id: OwnedLinkedChunkId,
1133
1134    /// A vector of all the linked chunk updates that happened during this event
1135    /// cache update.
1136    updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1137}
1138
1139impl RoomEventCacheLinkedChunkUpdate {
1140    /// Return all the new events propagated by this update, in topological
1141    /// order.
1142    pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1143        use itertools::Either;
1144        self.updates.into_iter().flat_map(|update| match update {
1145            linked_chunk::Update::PushItems { items, .. } => {
1146                Either::Left(Either::Left(items.into_iter()))
1147            }
1148            linked_chunk::Update::ReplaceItem { item, .. } => {
1149                Either::Left(Either::Right(std::iter::once(item)))
1150            }
1151            linked_chunk::Update::RemoveItem { .. }
1152            | linked_chunk::Update::DetachLastItems { .. }
1153            | linked_chunk::Update::StartReattachItems
1154            | linked_chunk::Update::EndReattachItems
1155            | linked_chunk::Update::NewItemsChunk { .. }
1156            | linked_chunk::Update::NewGapChunk { .. }
1157            | linked_chunk::Update::RemoveChunk(..)
1158            | linked_chunk::Update::Clear => {
1159                // All these updates don't contain any new event.
1160                Either::Right(std::iter::empty())
1161            }
1162        })
1163    }
1164}
1165
1166/// An update related to events happened in a room.
1167#[derive(Debug, Clone)]
1168pub enum RoomEventCacheUpdate {
1169    /// The fully read marker has moved to a different event.
1170    MoveReadMarkerTo {
1171        /// Event at which the read marker is now pointing.
1172        event_id: OwnedEventId,
1173    },
1174
1175    /// The members have changed.
1176    UpdateMembers {
1177        /// Collection of ambiguity changes that room member events trigger.
1178        ///
1179        /// This is a map of event ID of the `m.room.member` event to the
1180        /// details of the ambiguity change.
1181        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1182    },
1183
1184    /// The room has received updates for the timeline as _diffs_.
1185    UpdateTimelineEvents {
1186        /// Diffs to apply to the timeline.
1187        diffs: Vec<VectorDiff<TimelineEvent>>,
1188
1189        /// Where the diffs are coming from.
1190        origin: EventsOrigin,
1191    },
1192
1193    /// The room has received new ephemeral events.
1194    AddEphemeralEvents {
1195        /// XXX: this is temporary, until read receipts are handled in the event
1196        /// cache
1197        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1198    },
1199}
1200
1201/// Indicate where events are coming from.
1202#[derive(Debug, Clone)]
1203pub enum EventsOrigin {
1204    /// Events are coming from a sync.
1205    Sync,
1206
1207    /// Events are coming from pagination.
1208    Pagination,
1209
1210    /// The cause of the change is purely internal to the cache.
1211    Cache,
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    use std::{ops::Not, sync::Arc, time::Duration};
1217
1218    use assert_matches::assert_matches;
1219    use futures_util::FutureExt as _;
1220    use matrix_sdk_base::{
1221        RoomState,
1222        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1223        sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1224    };
1225    use matrix_sdk_test::{
1226        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
1227    };
1228    use ruma::{event_id, room_id, serde::Raw, user_id};
1229    use serde_json::json;
1230    use tokio::time::sleep;
1231
1232    use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1233    use crate::test_utils::{
1234        assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1235    };
1236
1237    #[async_test]
1238    async fn test_must_explicitly_subscribe() {
1239        let client = logged_in_client(None).await;
1240
1241        let event_cache = client.event_cache();
1242
1243        // If I create a room event subscriber for a room before subscribing the event
1244        // cache,
1245        let room_id = room_id!("!omelette:fromage.fr");
1246        let result = event_cache.for_room(room_id).await;
1247
1248        // Then it fails, because one must explicitly call `.subscribe()` on the event
1249        // cache.
1250        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1251    }
1252
1253    #[async_test]
1254    async fn test_uniq_read_marker() {
1255        let client = logged_in_client(None).await;
1256        let room_id = room_id!("!galette:saucisse.bzh");
1257        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1258
1259        let event_cache = client.event_cache();
1260
1261        event_cache.subscribe().unwrap();
1262
1263        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1264        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1265        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1266
1267        assert!(events.is_empty());
1268
1269        // When sending multiple times the same read marker event,…
1270        let read_marker_event = Raw::from_json_string(
1271            json!({
1272                "content": {
1273                    "event_id": "$crepe:saucisse.bzh"
1274                },
1275                "room_id": "!galette:saucisse.bzh",
1276                "type": "m.fully_read"
1277            })
1278            .to_string(),
1279        )
1280        .unwrap();
1281        let account_data = vec![read_marker_event; 100];
1282
1283        room_event_cache
1284            .inner
1285            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1286            .await
1287            .unwrap();
1288
1289        // … there's only one read marker update.
1290        assert_matches!(
1291            stream.recv().await.unwrap(),
1292            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1293        );
1294
1295        assert!(stream.recv().now_or_never().is_none());
1296
1297        // None, because an account data doesn't trigger a generic update.
1298        assert!(generic_stream.recv().now_or_never().is_none());
1299    }
1300
1301    #[async_test]
1302    async fn test_get_event_by_id() {
1303        let client = logged_in_client(None).await;
1304        let room_id1 = room_id!("!galette:saucisse.bzh");
1305        let room_id2 = room_id!("!crepe:saucisse.bzh");
1306
1307        client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1308        client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1309
1310        let event_cache = client.event_cache();
1311        event_cache.subscribe().unwrap();
1312
1313        // Insert two rooms with a few events.
1314        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1315
1316        let eid1 = event_id!("$1");
1317        let eid2 = event_id!("$2");
1318        let eid3 = event_id!("$3");
1319
1320        let joined_room_update1 = JoinedRoomUpdate {
1321            timeline: Timeline {
1322                events: vec![
1323                    f.text_msg("hey").event_id(eid1).into(),
1324                    f.text_msg("you").event_id(eid2).into(),
1325                ],
1326                ..Default::default()
1327            },
1328            ..Default::default()
1329        };
1330
1331        let joined_room_update2 = JoinedRoomUpdate {
1332            timeline: Timeline {
1333                events: vec![f.text_msg("bjr").event_id(eid3).into()],
1334                ..Default::default()
1335            },
1336            ..Default::default()
1337        };
1338
1339        let mut updates = RoomUpdates::default();
1340        updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1341        updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1342
1343        // Have the event cache handle them.
1344        event_cache.inner.handle_room_updates(updates).await.unwrap();
1345
1346        // We can find the events in a single room.
1347        let room1 = client.get_room(room_id1).unwrap();
1348
1349        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1350
1351        let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
1352        assert_event_matches_msg(&found1, "hey");
1353
1354        let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
1355        assert_event_matches_msg(&found2, "you");
1356
1357        // Retrieving the event with id3 from the room which doesn't contain it will
1358        // fail…
1359        assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
1360    }
1361
1362    #[async_test]
1363    async fn test_save_event() {
1364        let client = logged_in_client(None).await;
1365        let room_id = room_id!("!galette:saucisse.bzh");
1366
1367        let event_cache = client.event_cache();
1368        event_cache.subscribe().unwrap();
1369
1370        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1371        let event_id = event_id!("$1");
1372
1373        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1374        let room = client.get_room(room_id).unwrap();
1375
1376        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1377        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1378
1379        // Retrieving the event at the room-wide cache works.
1380        assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
1381    }
1382
1383    #[async_test]
1384    async fn test_generic_update_when_loading_rooms() {
1385        // Create 2 rooms. One of them has data in the event cache storage.
1386        let user = user_id!("@mnt_io:matrix.org");
1387        let client = logged_in_client(None).await;
1388        let room_id_0 = room_id!("!raclette:patate.ch");
1389        let room_id_1 = room_id!("!fondue:patate.ch");
1390
1391        let event_factory = EventFactory::new().room(room_id_0).sender(user);
1392
1393        let event_cache = client.event_cache();
1394        event_cache.subscribe().unwrap();
1395
1396        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1397        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1398
1399        client
1400            .event_cache_store()
1401            .lock()
1402            .await
1403            .expect("Could not acquire the event cache lock")
1404            .as_clean()
1405            .expect("Could not acquire a clean event cache lock")
1406            .handle_linked_chunk_updates(
1407                LinkedChunkId::Room(room_id_0),
1408                vec![
1409                    // Non-empty items chunk.
1410                    Update::NewItemsChunk {
1411                        previous: None,
1412                        new: ChunkIdentifier::new(0),
1413                        next: None,
1414                    },
1415                    Update::PushItems {
1416                        at: Position::new(ChunkIdentifier::new(0), 0),
1417                        items: vec![
1418                            event_factory
1419                                .text_msg("hello")
1420                                .sender(user)
1421                                .event_id(event_id!("$ev0"))
1422                                .into_event(),
1423                        ],
1424                    },
1425                ],
1426            )
1427            .await
1428            .unwrap();
1429
1430        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1431
1432        // Room 0 has initial data, so it must trigger a generic update.
1433        {
1434            let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1435
1436            assert_matches!(
1437                generic_stream.recv().await,
1438                Ok(RoomEventCacheGenericUpdate { room_id }) => {
1439                    assert_eq!(room_id, room_id_0);
1440                }
1441            );
1442        }
1443
1444        // Room 1 has NO initial data, so nothing should happen.
1445        {
1446            let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1447
1448            assert!(generic_stream.recv().now_or_never().is_none());
1449        }
1450    }
1451
1452    #[async_test]
1453    async fn test_generic_update_when_paginating_room() {
1454        // Create 1 room, with 4 chunks in the event cache storage.
1455        let user = user_id!("@mnt_io:matrix.org");
1456        let client = logged_in_client(None).await;
1457        let room_id = room_id!("!raclette:patate.ch");
1458
1459        let event_factory = EventFactory::new().room(room_id).sender(user);
1460
1461        let event_cache = client.event_cache();
1462        event_cache.subscribe().unwrap();
1463
1464        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1465
1466        client
1467            .event_cache_store()
1468            .lock()
1469            .await
1470            .expect("Could not acquire the event cache lock")
1471            .as_clean()
1472            .expect("Could not acquire a clean event cache lock")
1473            .handle_linked_chunk_updates(
1474                LinkedChunkId::Room(room_id),
1475                vec![
1476                    // Empty chunk.
1477                    Update::NewItemsChunk {
1478                        previous: None,
1479                        new: ChunkIdentifier::new(0),
1480                        next: None,
1481                    },
1482                    // Empty chunk.
1483                    Update::NewItemsChunk {
1484                        previous: Some(ChunkIdentifier::new(0)),
1485                        new: ChunkIdentifier::new(1),
1486                        next: None,
1487                    },
1488                    // Non-empty items chunk.
1489                    Update::NewItemsChunk {
1490                        previous: Some(ChunkIdentifier::new(1)),
1491                        new: ChunkIdentifier::new(2),
1492                        next: None,
1493                    },
1494                    Update::PushItems {
1495                        at: Position::new(ChunkIdentifier::new(2), 0),
1496                        items: vec![
1497                            event_factory
1498                                .text_msg("hello")
1499                                .sender(user)
1500                                .event_id(event_id!("$ev0"))
1501                                .into_event(),
1502                        ],
1503                    },
1504                    // Non-empty items chunk.
1505                    Update::NewItemsChunk {
1506                        previous: Some(ChunkIdentifier::new(2)),
1507                        new: ChunkIdentifier::new(3),
1508                        next: None,
1509                    },
1510                    Update::PushItems {
1511                        at: Position::new(ChunkIdentifier::new(3), 0),
1512                        items: vec![
1513                            event_factory
1514                                .text_msg("world")
1515                                .sender(user)
1516                                .event_id(event_id!("$ev1"))
1517                                .into_event(),
1518                        ],
1519                    },
1520                ],
1521            )
1522            .await
1523            .unwrap();
1524
1525        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1526
1527        // Room is initialised, it gets one event in the timeline.
1528        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1529
1530        assert_matches!(
1531            generic_stream.recv().await,
1532            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1533                assert_eq!(room_id, expected_room_id);
1534            }
1535        );
1536
1537        let pagination = room_event_cache.pagination();
1538
1539        // Paginate, it gets one new event in the timeline.
1540        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1541
1542        assert_eq!(pagination_outcome.events.len(), 1);
1543        assert!(pagination_outcome.reached_start.not());
1544        assert_matches!(
1545            generic_stream.recv().await,
1546            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1547                assert_eq!(room_id, expected_room_id);
1548            }
1549        );
1550
1551        // Paginate, it gets zero new event in the timeline.
1552        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1553
1554        assert!(pagination_outcome.events.is_empty());
1555        assert!(pagination_outcome.reached_start.not());
1556        assert!(generic_stream.recv().now_or_never().is_none());
1557
1558        // Paginate once more. Just checking our scenario is correct.
1559        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1560
1561        assert!(pagination_outcome.reached_start);
1562        assert!(generic_stream.recv().now_or_never().is_none());
1563    }
1564
1565    #[async_test]
1566    async fn test_for_room_when_room_is_not_found() {
1567        let client = logged_in_client(None).await;
1568        let room_id = room_id!("!raclette:patate.ch");
1569
1570        let event_cache = client.event_cache();
1571        event_cache.subscribe().unwrap();
1572
1573        // Room doesn't exist. It returns an error.
1574        assert_matches!(
1575            event_cache.for_room(room_id).await,
1576            Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1577                assert_eq!(room_id, not_found_room_id);
1578            }
1579        );
1580
1581        // Now create the room.
1582        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1583
1584        // Room exists. Everything fine.
1585        assert!(event_cache.for_room(room_id).await.is_ok());
1586    }
1587
1588    /// Test that the event cache does not create reference cycles or tasks that
1589    /// retain its reference indefinitely, preventing it from being deallocated.
1590    #[cfg(not(target_family = "wasm"))]
1591    #[async_test]
1592    async fn test_no_refcycle_event_cache_tasks() {
1593        let client = MockClientBuilder::new(None).build().await;
1594
1595        // Wait for the init tasks to die.
1596        sleep(Duration::from_secs(1)).await;
1597
1598        let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1599        assert_eq!(event_cache_weak.strong_count(), 1);
1600
1601        {
1602            let room_id = room_id!("!room:example.org");
1603
1604            // Have the client know the room.
1605            let response = SyncResponseBuilder::default()
1606                .add_joined_room(JoinedRoomBuilder::new(room_id))
1607                .build_sync_response();
1608            client.inner.base_client.receive_sync_response(response).await.unwrap();
1609
1610            client.event_cache().subscribe().unwrap();
1611
1612            let (_room_event_cache, _drop_handles) =
1613                client.get_room(room_id).unwrap().event_cache().await.unwrap();
1614        }
1615
1616        drop(client);
1617
1618        // Give a bit of time for background tasks to die.
1619        sleep(Duration::from_secs(1)).await;
1620
1621        // No strong counts should exist now that the Client has been dropped.
1622        assert_eq!(
1623            event_cache_weak.strong_count(),
1624            0,
1625            "Too many strong references to the event cache {}",
1626            event_cache_weak.strong_count()
1627        );
1628    }
1629}