matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::{BTreeMap, HashMap},
32    fmt,
33    sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40    ThreadingSupport,
41    cross_process_lock::CrossProcessLockError,
42    deserialized_responses::{AmbiguityChange, TimelineEvent},
43    event_cache::{
44        Gap,
45        store::{EventCacheStoreError, EventCacheStoreLock},
46    },
47    executor::AbortOnDrop,
48    linked_chunk::{self, OwnedLinkedChunkId, lazy_loader::LazyLoaderError},
49    serde_helpers::extract_thread_root_from_content,
50    sync::RoomUpdates,
51    timer,
52};
53use matrix_sdk_common::executor::{JoinHandle, spawn};
54use room::RoomEventCacheState;
55use ruma::{
56    OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, events::AnySyncEphemeralRoomEvent,
57    serde::Raw,
58};
59use tokio::{
60    select,
61    sync::{
62        Mutex, RwLock,
63        broadcast::{Receiver, Sender, channel, error::RecvError},
64        mpsc,
65    },
66};
67use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
68
69use crate::{
70    Client,
71    client::WeakClient,
72    send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
73};
74
75mod deduplicator;
76mod pagination;
77mod room;
78
79pub use pagination::{RoomPagination, RoomPaginationStatus};
80pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
81
82/// An error observed in the [`EventCache`].
83#[derive(thiserror::Error, Debug)]
84pub enum EventCacheError {
85    /// The [`EventCache`] instance hasn't been initialized with
86    /// [`EventCache::subscribe`]
87    #[error(
88        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
89    )]
90    NotSubscribedYet,
91
92    /// Room is not found.
93    #[error("Room `{room_id}` is not found.")]
94    RoomNotFound {
95        /// The ID of the room not being found.
96        room_id: OwnedRoomId,
97    },
98
99    /// An error has been observed while back-paginating.
100    #[error(transparent)]
101    BackpaginationError(Box<crate::Error>),
102
103    /// Back-pagination was already happening in a given room, where we tried to
104    /// back-paginate again.
105    #[error("We were already back-paginating.")]
106    AlreadyBackpaginating,
107
108    /// An error happening when interacting with storage.
109    #[error(transparent)]
110    Storage(#[from] EventCacheStoreError),
111
112    /// An error happening when attempting to (cross-process) lock storage.
113    #[error(transparent)]
114    LockingStorage(#[from] CrossProcessLockError),
115
116    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
117    /// to. It's possible this weak reference points to nothing anymore, at
118    /// times where we try to use the client.
119    #[error("The owning client of the event cache has been dropped.")]
120    ClientDropped,
121
122    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
123    /// loader.
124    ///
125    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
126    #[error(transparent)]
127    LinkedChunkLoader(#[from] LazyLoaderError),
128
129    /// An error happened when reading the metadata of a linked chunk, upon
130    /// reload.
131    #[error("the linked chunk metadata is invalid: {details}")]
132    InvalidLinkedChunkMetadata {
133        /// A string containing details about the error.
134        details: String,
135    },
136}
137
138/// A result using the [`EventCacheError`].
139pub type Result<T> = std::result::Result<T, EventCacheError>;
140
141/// Hold handles to the tasks spawn by a [`EventCache`].
142pub struct EventCacheDropHandles {
143    /// Task that listens to room updates.
144    listen_updates_task: JoinHandle<()>,
145
146    /// Task that listens to updates to the user's ignored list.
147    ignore_user_list_update_task: JoinHandle<()>,
148
149    /// The task used to automatically shrink the linked chunks.
150    auto_shrink_linked_chunk_task: JoinHandle<()>,
151}
152
153impl fmt::Debug for EventCacheDropHandles {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
156    }
157}
158
159impl Drop for EventCacheDropHandles {
160    fn drop(&mut self) {
161        self.listen_updates_task.abort();
162        self.ignore_user_list_update_task.abort();
163        self.auto_shrink_linked_chunk_task.abort();
164    }
165}
166
167/// An event cache, providing lots of useful functionality for clients.
168///
169/// Cloning is shallow, and thus is cheap to do.
170///
171/// See also the module-level comment.
172#[derive(Clone)]
173pub struct EventCache {
174    /// Reference to the inner cache.
175    inner: Arc<EventCacheInner>,
176}
177
178impl fmt::Debug for EventCache {
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        f.debug_struct("EventCache").finish_non_exhaustive()
181    }
182}
183
184impl EventCache {
185    /// Create a new [`EventCache`] for the given client.
186    pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
187        let (generic_update_sender, _) = channel(32);
188        let (linked_chunk_update_sender, _) = channel(32);
189
190        let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
191        let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
192            client.clone(),
193            linked_chunk_update_sender.clone(),
194            thread_subscriber_sender,
195        )));
196
197        #[cfg(feature = "experimental-search")]
198        let search_indexing_task = AbortOnDrop::new(spawn(Self::search_indexing_task(
199            client.clone(),
200            linked_chunk_update_sender.clone(),
201        )));
202
203        Self {
204            inner: Arc::new(EventCacheInner {
205                client,
206                store: event_cache_store,
207                multiple_room_updates_lock: Default::default(),
208                by_room: Default::default(),
209                drop_handles: Default::default(),
210                auto_shrink_sender: Default::default(),
211                generic_update_sender,
212                linked_chunk_update_sender,
213                _thread_subscriber_task: thread_subscriber_task,
214                #[cfg(feature = "experimental-search")]
215                _search_indexing_task: search_indexing_task,
216                thread_subscriber_receiver,
217            }),
218        }
219    }
220
221    /// Subscribes to updates that a thread subscription has been sent.
222    ///
223    /// For testing purposes only.
224    #[doc(hidden)]
225    pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
226        self.inner.thread_subscriber_receiver.resubscribe()
227    }
228
229    /// Starts subscribing the [`EventCache`] to sync responses, if not done
230    /// before.
231    ///
232    /// Re-running this has no effect if we already subscribed before, and is
233    /// cheap.
234    pub fn subscribe(&self) -> Result<()> {
235        let client = self.inner.client()?;
236
237        // Initialize the drop handles.
238        let _ = self.inner.drop_handles.get_or_init(|| {
239            // Spawn the task that will listen to all the room updates at once.
240            let listen_updates_task = spawn(Self::listen_task(
241                self.inner.clone(),
242                client.subscribe_to_all_room_updates(),
243            ));
244
245            let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
246                self.inner.clone(),
247                client.subscribe_to_ignore_user_list_changes(),
248            ));
249
250            let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
251
252            // Force-initialize the sender in the [`RoomEventCacheInner`].
253            self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
254
255            let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task(
256                Arc::downgrade(&self.inner),
257                auto_shrink_receiver,
258            ));
259
260            Arc::new(EventCacheDropHandles {
261                listen_updates_task,
262                ignore_user_list_update_task,
263                auto_shrink_linked_chunk_task,
264            })
265        });
266
267        Ok(())
268    }
269
270    #[instrument(skip_all)]
271    async fn ignore_user_list_update_task(
272        inner: Arc<EventCacheInner>,
273        mut ignore_user_list_stream: Subscriber<Vec<String>>,
274    ) {
275        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
276        span.follows_from(Span::current());
277
278        async move {
279            while ignore_user_list_stream.next().await.is_some() {
280                info!("Received an ignore user list change");
281                if let Err(err) = inner.clear_all_rooms().await {
282                    error!("when clearing room storage after ignore user list change: {err}");
283                }
284            }
285            info!("Ignore user list stream has closed");
286        }
287        .instrument(span)
288        .await;
289    }
290
291    /// For benchmarking purposes only.
292    #[doc(hidden)]
293    pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
294        self.inner.handle_room_updates(updates).await
295    }
296
297    #[instrument(skip_all)]
298    async fn listen_task(
299        inner: Arc<EventCacheInner>,
300        mut room_updates_feed: Receiver<RoomUpdates>,
301    ) {
302        trace!("Spawning the listen task");
303        loop {
304            match room_updates_feed.recv().await {
305                Ok(updates) => {
306                    trace!("Receiving `RoomUpdates`");
307
308                    if let Err(err) = inner.handle_room_updates(updates).await {
309                        match err {
310                            EventCacheError::ClientDropped => {
311                                // The client has dropped, exit the listen task.
312                                info!(
313                                    "Closing the event cache global listen task because client dropped"
314                                );
315                                break;
316                            }
317                            err => {
318                                error!("Error when handling room updates: {err}");
319                            }
320                        }
321                    }
322                }
323
324                Err(RecvError::Lagged(num_skipped)) => {
325                    // Forget everything we know; we could have missed events, and we have
326                    // no way to reconcile at the moment!
327                    // TODO: implement Smart Matching™,
328                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
329                    if let Err(err) = inner.clear_all_rooms().await {
330                        error!("when clearing storage after lag in listen_task: {err}");
331                    }
332                }
333
334                Err(RecvError::Closed) => {
335                    // The sender has shut down, exit.
336                    info!("Closing the event cache global listen task because receiver closed");
337                    break;
338                }
339            }
340        }
341    }
342
343    /// Spawns the task that will listen to auto-shrink notifications.
344    ///
345    /// The auto-shrink mechanism works this way:
346    ///
347    /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
348    ///   increment the active number of subscribers to that room, aka
349    ///   [`RoomEventCacheState::subscriber_count`].
350    /// - When that subscriber is dropped, it will decrement that count; and
351    ///   notify the task below if it reached 0.
352    /// - The task spawned here, owned by the [`EventCacheInner`], will listen
353    ///   to such notifications that a room may be shrunk. It will attempt an
354    ///   auto-shrink, by letting the inner state decide whether this is a good
355    ///   time to do so (new subscribers might have spawned in the meanwhile).
356    #[instrument(skip_all)]
357    async fn auto_shrink_linked_chunk_task(
358        inner: Weak<EventCacheInner>,
359        mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
360    ) {
361        while let Some(room_id) = rx.recv().await {
362            trace!(for_room = %room_id, "received notification to shrink");
363
364            let Some(inner) = inner.upgrade() else {
365                return;
366            };
367
368            let room = match inner.for_room(&room_id).await {
369                Ok(room) => room,
370                Err(err) => {
371                    warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
372                    continue;
373                }
374            };
375
376            trace!("waiting for state lock…");
377            let mut state = room.inner.state.write().await;
378
379            match state.auto_shrink_if_no_subscribers().await {
380                Ok(diffs) => {
381                    if let Some(diffs) = diffs {
382                        // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
383                        // subscribers, right? RIGHT? Especially because the state is guarded behind
384                        // a lock.
385                        //
386                        // However, better safe than sorry, and it's cheap to send an update here,
387                        // so let's do it!
388                        if !diffs.is_empty() {
389                            let _ = room.inner.sender.send(
390                                RoomEventCacheUpdate::UpdateTimelineEvents {
391                                    diffs,
392                                    origin: EventsOrigin::Cache,
393                                },
394                            );
395                        }
396                    } else {
397                        debug!("auto-shrinking didn't happen");
398                    }
399                }
400
401                Err(err) => {
402                    // There's not much we can do here, unfortunately.
403                    warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
404                }
405            }
406        }
407
408        info!("Auto-shrink linked chunk task has been closed, exiting");
409    }
410
411    /// Return a room-specific view over the [`EventCache`].
412    pub(crate) async fn for_room(
413        &self,
414        room_id: &RoomId,
415    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
416        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
417            return Err(EventCacheError::NotSubscribedYet);
418        };
419
420        let room = self.inner.for_room(room_id).await?;
421
422        Ok((room, drop_handles))
423    }
424
425    /// Cleanly clear all the rooms' event caches.
426    ///
427    /// This will notify any live observers that the room has been cleared.
428    pub async fn clear_all_rooms(&self) -> Result<()> {
429        self.inner.clear_all_rooms().await
430    }
431
432    /// Subscribe to room _generic_ updates.
433    ///
434    /// If one wants to listen what has changed in a specific room, the
435    /// [`RoomEventCache::subscribe`] is recommended. However, the
436    /// [`RoomEventCacheSubscriber`] type triggers side-effects.
437    ///
438    /// If one wants to get a high-overview, generic, updates for rooms, and
439    /// without side-effects, this method is recommended. Also, dropping the
440    /// receiver of this channel will not trigger any side-effect.
441    pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
442        self.inner.generic_update_sender.subscribe()
443    }
444
445    /// React to a given linked chunk update by subscribing the user to a
446    /// thread, if needs be (when the user got mentioned in a thread reply, for
447    /// a thread they were not subscribed to).
448    ///
449    /// Returns a boolean indicating whether the task should keep on running or
450    /// not.
451    #[instrument(skip(client, thread_subscriber_sender))]
452    async fn handle_thread_subscriber_linked_chunk_update(
453        client: &WeakClient,
454        thread_subscriber_sender: &Sender<()>,
455        up: RoomEventCacheLinkedChunkUpdate,
456    ) -> bool {
457        let Some(client) = client.get() else {
458            // Client shutting down.
459            debug!("Client is shutting down, exiting thread subscriber task");
460            return false;
461        };
462
463        let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
464            trace!("received an update for a non-thread linked chunk, ignoring");
465            return true;
466        };
467
468        let Some(room) = client.get_room(room_id) else {
469            warn!(%room_id, "unknown room");
470            return true;
471        };
472
473        let thread_root = thread_root.clone();
474
475        let mut new_events = up.events().peekable();
476
477        if new_events.peek().is_none() {
478            // No new events, nothing to do.
479            return true;
480        }
481
482        // This `PushContext` is going to be used to compute whether an in-thread event
483        // would trigger a mention.
484        //
485        // Of course, we're not interested in an in-thread event causing a mention,
486        // because it's part of a thread we've subscribed to. So the
487        // `PushContext` must not include the check for thread subscriptions (otherwise
488        // it would be impossible to subscribe to new threads).
489
490        let with_thread_subscriptions = false;
491
492        let Some(push_context) = room
493            .push_context_internal(with_thread_subscriptions)
494            .await
495            .inspect_err(|err| {
496                warn!("Failed to get push context for threads: {err}");
497            })
498            .ok()
499            .flatten()
500        else {
501            warn!("Missing push context for thread subscriptions.");
502            return true;
503        };
504
505        let mut subscribe_up_to = None;
506
507        // Find if there's an event that would trigger a mention for the current
508        // user, iterating from the end of the new events towards the oldest, so we can
509        // find the most recent event to subscribe to.
510        for ev in new_events.rev() {
511            if push_context
512                .for_event(ev.raw())
513                .await
514                .into_iter()
515                .any(|action| action.should_notify())
516            {
517                let Some(event_id) = ev.event_id() else {
518                    // Shouldn't happen.
519                    continue;
520                };
521                subscribe_up_to = Some(event_id);
522                break;
523            }
524        }
525
526        // And if we've found such a mention, subscribe to the thread up to this
527        // event.
528        if let Some(event_id) = subscribe_up_to {
529            trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
530            if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
531                warn!(%err, "Failed to subscribe to thread");
532            } else {
533                let _ = thread_subscriber_sender.send(());
534            }
535        }
536
537        true
538    }
539
540    /// React to a given send queue update by subscribing the user to a
541    /// thread, if needs be (when the user sent an event in a thread they were
542    /// not subscribed to).
543    ///
544    /// Returns a boolean indicating whether the task should keep on running or
545    /// not.
546    #[instrument(skip(client, thread_subscriber_sender))]
547    async fn handle_thread_subscriber_send_queue_update(
548        client: &WeakClient,
549        thread_subscriber_sender: &Sender<()>,
550        events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
551        up: SendQueueUpdate,
552    ) -> bool {
553        let Some(client) = client.get() else {
554            // Client shutting down.
555            debug!("Client is shutting down, exiting thread subscriber task");
556            return false;
557        };
558
559        let room_id = up.room_id;
560        let Some(room) = client.get_room(&room_id) else {
561            warn!(%room_id, "unknown room");
562            return true;
563        };
564
565        let (thread_root, subscribe_up_to) = match up.update {
566            RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
567                match local_echo.content {
568                    LocalEchoContent::Event { serialized_event, .. } => {
569                        if let Some(thread_root) =
570                            extract_thread_root_from_content(serialized_event.into_raw().0)
571                        {
572                            events_being_sent.insert(local_echo.transaction_id, thread_root);
573                        }
574                    }
575                    LocalEchoContent::React { .. } => {
576                        // Nothing to do, reactions don't count as a thread
577                        // subscription.
578                    }
579                }
580                return true;
581            }
582
583            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
584                events_being_sent.remove(&transaction_id);
585                return true;
586            }
587
588            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
589                if let Some(thread_root) =
590                    extract_thread_root_from_content(new_content.into_raw().0)
591                {
592                    events_being_sent.insert(transaction_id, thread_root);
593                } else {
594                    // It could be that the event isn't part of a thread anymore; handle that by
595                    // removing the pending transaction id.
596                    events_being_sent.remove(&transaction_id);
597                }
598                return true;
599            }
600
601            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
602                if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
603                    (thread_root, event_id)
604                } else {
605                    // We don't know about the event that has been sent, so ignore it.
606                    trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
607                    return true;
608                }
609            }
610
611            RoomSendQueueUpdate::SendError { .. }
612            | RoomSendQueueUpdate::RetryEvent { .. }
613            | RoomSendQueueUpdate::MediaUpload { .. } => {
614                // Nothing to do for these bad boys.
615                return true;
616            }
617        };
618
619        // And if we've found such a mention, subscribe to the thread up to this event.
620        trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
621        if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
622        {
623            warn!(%err, "Failed to subscribe to thread");
624        } else {
625            let _ = thread_subscriber_sender.send(());
626        }
627
628        true
629    }
630
631    #[instrument(skip_all)]
632    async fn thread_subscriber_task(
633        client: WeakClient,
634        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
635        thread_subscriber_sender: Sender<()>,
636    ) {
637        let mut send_q_rx = if let Some(client) = client.get() {
638            if !client.enabled_thread_subscriptions() {
639                trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
640                return;
641            }
642
643            client.send_queue().subscribe()
644        } else {
645            trace!("Client is shutting down, not spawning thread subscriber task");
646            return;
647        };
648
649        let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
650
651        // A mapping of local echoes (events being sent), to their thread root, if
652        // they're in an in-thread reply.
653        //
654        // Entirely managed by `handle_thread_subscriber_send_queue_update`.
655        let mut events_being_sent = HashMap::new();
656
657        loop {
658            select! {
659                res = send_q_rx.recv() => {
660                    match res {
661                        Ok(up) => {
662                            if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
663                                break;
664                            }
665                        }
666                        Err(RecvError::Closed) => {
667                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
668                            break;
669                        }
670                        Err(RecvError::Lagged(num_skipped)) => {
671                            warn!(num_skipped, "Lagged behind linked chunk updates");
672                        }
673                    }
674                }
675
676                res = linked_chunk_rx.recv() => {
677                    match res {
678                        Ok(up) => {
679                            if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
680                                break;
681                            }
682                        }
683                        Err(RecvError::Closed) => {
684                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
685                            break;
686                        }
687                        Err(RecvError::Lagged(num_skipped)) => {
688                            warn!(num_skipped, "Lagged behind linked chunk updates");
689                        }
690                    }
691                }
692            }
693        }
694    }
695
696    /// Takes a [`TimelineEvent`] and passes it to the [`RoomIndex`] of the
697    /// given room which will add/remove/edit an event in the index based on
698    /// the event type.
699    #[cfg(feature = "experimental-search")]
700    #[instrument(skip_all)]
701    async fn search_indexing_task(
702        client: WeakClient,
703        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
704    ) {
705        let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
706
707        loop {
708            match linked_chunk_update_receiver.recv().await {
709                Ok(room_ec_lc_update) => {
710                    let OwnedLinkedChunkId::Room(room_id) =
711                        room_ec_lc_update.linked_chunk_id.clone()
712                    else {
713                        trace!("Received non-room updates, ignoring.");
714                        continue;
715                    };
716
717                    let mut timeline_events = room_ec_lc_update.events().peekable();
718
719                    if timeline_events.peek().is_none() {
720                        continue;
721                    }
722
723                    let Some(client) = client.get() else {
724                        trace!("Client is shutting down, not spawning thread subscriber task");
725                        return;
726                    };
727
728                    let maybe_room_cache = client.event_cache().for_room(&room_id).await;
729                    let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
730                        warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
731                        continue;
732                    };
733
734                    let maybe_room = client.get_room(&room_id);
735                    let Some(room) = maybe_room else {
736                        warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
737                        continue;
738                    };
739                    let redaction_rules =
740                        room.clone_info().room_version_rules_or_default().redaction;
741
742                    let mut search_index_guard = client.search_index().lock().await;
743
744                    if let Err(err) = search_index_guard
745                        .bulk_handle_timeline_event(
746                            timeline_events,
747                            &room_cache,
748                            &room_id,
749                            &redaction_rules,
750                        )
751                        .await
752                    {
753                        error!("Failed to handle events for indexing: {err}")
754                    }
755                }
756                Err(RecvError::Closed) => {
757                    debug!(
758                        "Linked chunk update channel has been closed, exiting thread subscriber task"
759                    );
760                    break;
761                }
762                Err(RecvError::Lagged(num_skipped)) => {
763                    warn!(num_skipped, "Lagged behind linked chunk updates");
764                }
765            }
766        }
767    }
768}
769
770struct EventCacheInner {
771    /// A weak reference to the inner client, useful when trying to get a handle
772    /// on the owning client.
773    client: WeakClient,
774
775    /// Reference to the underlying store.
776    store: EventCacheStoreLock,
777
778    /// A lock used when many rooms must be updated at once.
779    ///
780    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
781    /// ensure that multiple updates will be applied in the correct order, which
782    /// is enforced by taking this lock when handling an update.
783    // TODO: that's the place to add a cross-process lock!
784    multiple_room_updates_lock: Mutex<()>,
785
786    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
787    by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
788
789    /// Handles to keep alive the task listening to updates.
790    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
791
792    /// A sender for notifications that a room *may* need to be auto-shrunk.
793    ///
794    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
795    /// instance.
796    ///
797    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
798    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
799
800    /// A sender for room generic update.
801    ///
802    /// See doc comment of [`RoomEventCacheGenericUpdate`] and
803    /// [`EventCache::subscribe_to_room_generic_updates`].
804    generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
805
806    /// A sender for a persisted linked chunk update.
807    ///
808    /// This is used to notify that some linked chunk has persisted some updates
809    /// to a store, during sync or a back-pagination of *any* linked chunk.
810    /// This can be used by observers to look for new events.
811    ///
812    /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
813    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
814
815    /// A background task listening to room and send queue updates, and
816    /// automatically subscribing the user to threads when needed, based on
817    /// the semantics of MSC4306.
818    ///
819    /// One important constraint is that there is only one such task per
820    /// [`EventCache`], so it does listen to *all* rooms at the same time.
821    _thread_subscriber_task: AbortOnDrop<()>,
822
823    /// A background task listening to room updates, and
824    /// automatically handling search index operations add/remove/edit
825    /// depending on the event type.
826    ///
827    /// One important constraint is that there is only one such task per
828    /// [`EventCache`], so it does listen to *all* rooms at the same time.
829    #[cfg(feature = "experimental-search")]
830    _search_indexing_task: AbortOnDrop<()>,
831
832    /// A test helper receiver that will be emitted every time the thread
833    /// subscriber task subscribed to a new thread.
834    ///
835    /// This is helpful for tests to coordinate that a new thread subscription
836    /// has been sent or not.
837    thread_subscriber_receiver: Receiver<()>,
838}
839
840type AutoShrinkChannelPayload = OwnedRoomId;
841
842impl EventCacheInner {
843    fn client(&self) -> Result<Client> {
844        self.client.get().ok_or(EventCacheError::ClientDropped)
845    }
846
847    /// Clears all the room's data.
848    async fn clear_all_rooms(&self) -> Result<()> {
849        // Okay, here's where things get complicated.
850        //
851        // On the one hand, `by_room` may include storage for *some* rooms that we know
852        // about, but not *all* of them. Any room that hasn't been loaded in the
853        // client, or touched by a sync, will remain unloaded in memory, so it
854        // will be missing from `self.by_room`. As a result, we need to make
855        // sure that we're hitting the storage backend to *really* clear all the
856        // rooms, including those that haven't been loaded yet.
857        //
858        // On the other hand, one must NOT clear the `by_room` map, because if someone
859        // subscribed to a room update, they would never get any new update for
860        // that room, since re-creating the `RoomEventCache` would create a new,
861        // unrelated sender.
862        //
863        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
864        // store backend.
865        //
866        // As a result, for a short while, the in-memory linked chunks
867        // will be desynchronized from the storage. We need to be careful then. During
868        // that short while, we don't want *anyone* to touch the linked chunk
869        // (be it in memory or in the storage).
870        //
871        // And since that requirement applies to *any* room in `by_room` at the same
872        // time, we'll have to take the locks for *all* the live rooms, so as to
873        // properly clear the underlying storage.
874        //
875        // At this point, you might be scared about the potential for deadlocking. I am
876        // as well, but I'm convinced we're fine:
877        // 1. the lock for `by_room` is usually held only for a short while, and
878        //    independently of the other two kinds.
879        // 2. the state may acquire the store cross-process lock internally, but only
880        //    while the state's methods are called (so it's always transient). As a
881        //    result, as soon as we've acquired the state locks, the store lock ought to
882        //    be free.
883        // 3. The store lock is held explicitly only in a small scoped area below.
884        // 4. Then the store lock will be held internally when calling `reset()`, but at
885        //    this point it's only held for a short while each time, so rooms will take
886        //    turn to acquire it.
887
888        let rooms = self.by_room.write().await;
889
890        // Collect all the rooms' state locks, first: we can clear the storage only when
891        // nobody will touch it at the same time.
892        let room_locks = join_all(
893            rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
894        )
895        .await;
896
897        // Clear the storage for all the rooms, using the storage facility.
898        self.store.lock().await?.clear_all_linked_chunks().await?;
899
900        // At this point, all the in-memory linked chunks are desynchronized from the
901        // storage. Resynchronize them manually by calling reset(), and
902        // propagate updates to observers.
903        try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
904            let updates_as_vector_diffs = state_guard.reset().await?;
905
906            let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
907                diffs: updates_as_vector_diffs,
908                origin: EventsOrigin::Cache,
909            });
910
911            let _ = room
912                .inner
913                .generic_update_sender
914                .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
915
916            Ok::<_, EventCacheError>(())
917        }))
918        .await?;
919
920        Ok(())
921    }
922
923    /// Handles a single set of room updates at once.
924    #[instrument(skip(self, updates))]
925    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
926        // First, take the lock that indicates we're processing updates, to avoid
927        // handling multiple updates concurrently.
928        let _lock = {
929            let _timer = timer!("Taking the `multiple_room_updates_lock`");
930            self.multiple_room_updates_lock.lock().await
931        };
932
933        // Note: bnjbvr tried to make this concurrent at some point, but it turned out
934        // to be a performance regression, even for large sync updates. Lacking
935        // time to investigate, this code remains sequential for now. See also
936        // https://github.com/matrix-org/matrix-rust-sdk/pull/5426.
937
938        // Left rooms.
939        for (room_id, left_room_update) in updates.left {
940            let room = self.for_room(&room_id).await?;
941
942            if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
943                // Non-fatal error, try to continue to the next room.
944                error!("handling left room update: {err}");
945            }
946        }
947
948        // Joined rooms.
949        for (room_id, joined_room_update) in updates.joined {
950            trace!(?room_id, "Handling a `JoinedRoomUpdate`");
951
952            let room = self.for_room(&room_id).await?;
953
954            if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
955                // Non-fatal error, try to continue to the next room.
956                error!(%room_id, "handling joined room update: {err}");
957            }
958        }
959
960        // Invited rooms.
961        // TODO: we don't anything with `updates.invite` at this point.
962
963        Ok(())
964    }
965
966    /// Return a room-specific view over the [`EventCache`].
967    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
968        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
969        // write lock.
970        let by_room_guard = self.by_room.read().await;
971
972        match by_room_guard.get(room_id) {
973            Some(room) => Ok(room.clone()),
974
975            None => {
976                // Slow-path: the entry doesn't exist; let's acquire a write lock.
977                drop(by_room_guard);
978                let mut by_room_guard = self.by_room.write().await;
979
980                // In the meanwhile, some other caller might have obtained write access and done
981                // the same, so check for existence again.
982                if let Some(room) = by_room_guard.get(room_id) {
983                    return Ok(room.clone());
984                }
985
986                let pagination_status =
987                    SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
988
989                let Some(client) = self.client.get() else {
990                    return Err(EventCacheError::ClientDropped);
991                };
992
993                let room = client
994                    .get_room(room_id)
995                    .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
996                let room_version_rules = room.clone_info().room_version_rules_or_default();
997
998                let enabled_thread_support = matches!(
999                    client.base_client().threading_support,
1000                    ThreadingSupport::Enabled { .. }
1001                );
1002
1003                let room_state = RoomEventCacheState::new(
1004                    room_id.to_owned(),
1005                    room_version_rules,
1006                    enabled_thread_support,
1007                    self.linked_chunk_update_sender.clone(),
1008                    self.store.clone(),
1009                    pagination_status.clone(),
1010                )
1011                .await?;
1012
1013                let timeline_is_not_empty =
1014                    room_state.room_linked_chunk().revents().next().is_some();
1015
1016                // SAFETY: we must have subscribed before reaching this code, otherwise
1017                // something is very wrong.
1018                let auto_shrink_sender =
1019                    self.auto_shrink_sender.get().cloned().expect(
1020                        "we must have called `EventCache::subscribe()` before calling here.",
1021                    );
1022
1023                let room_event_cache = RoomEventCache::new(
1024                    self.client.clone(),
1025                    room_state,
1026                    pagination_status,
1027                    room_id.to_owned(),
1028                    auto_shrink_sender,
1029                    self.generic_update_sender.clone(),
1030                );
1031
1032                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1033
1034                // If at least one event has been loaded, it means there is a timeline. Let's
1035                // emit a generic update.
1036                if timeline_is_not_empty {
1037                    let _ = self
1038                        .generic_update_sender
1039                        .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1040                }
1041
1042                Ok(room_event_cache)
1043            }
1044        }
1045    }
1046}
1047
1048/// The result of a single back-pagination request.
1049#[derive(Debug)]
1050pub struct BackPaginationOutcome {
1051    /// Did the back-pagination reach the start of the timeline?
1052    pub reached_start: bool,
1053
1054    /// All the events that have been returned in the back-pagination
1055    /// request.
1056    ///
1057    /// Events are presented in reverse order: the first element of the vec,
1058    /// if present, is the most "recent" event from the chunk (or
1059    /// technically, the last one in the topological ordering).
1060    pub events: Vec<TimelineEvent>,
1061}
1062
1063/// Represents a timeline update of a room. It hides the details of
1064/// [`RoomEventCacheUpdate`] by being more generic.
1065///
1066/// This is used by [`EventCache::subscribe_to_room_generic_updates`]. Please
1067/// read it to learn more about the motivation behind this type.
1068#[derive(Clone, Debug)]
1069pub struct RoomEventCacheGenericUpdate {
1070    /// The room ID owning the timeline.
1071    pub room_id: OwnedRoomId,
1072}
1073
1074/// An update being triggered when events change in the persisted event cache
1075/// for any room.
1076#[derive(Clone, Debug)]
1077struct RoomEventCacheLinkedChunkUpdate {
1078    /// The linked chunk affected by the update.
1079    linked_chunk_id: OwnedLinkedChunkId,
1080
1081    /// A vector of all the linked chunk updates that happened during this event
1082    /// cache update.
1083    updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1084}
1085
1086impl RoomEventCacheLinkedChunkUpdate {
1087    /// Return all the new events propagated by this update, in topological
1088    /// order.
1089    pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1090        use itertools::Either;
1091        self.updates.into_iter().flat_map(|update| match update {
1092            linked_chunk::Update::PushItems { items, .. } => {
1093                Either::Left(Either::Left(items.into_iter()))
1094            }
1095            linked_chunk::Update::ReplaceItem { item, .. } => {
1096                Either::Left(Either::Right(std::iter::once(item)))
1097            }
1098            linked_chunk::Update::RemoveItem { .. }
1099            | linked_chunk::Update::DetachLastItems { .. }
1100            | linked_chunk::Update::StartReattachItems
1101            | linked_chunk::Update::EndReattachItems
1102            | linked_chunk::Update::NewItemsChunk { .. }
1103            | linked_chunk::Update::NewGapChunk { .. }
1104            | linked_chunk::Update::RemoveChunk(..)
1105            | linked_chunk::Update::Clear => {
1106                // All these updates don't contain any new event.
1107                Either::Right(std::iter::empty())
1108            }
1109        })
1110    }
1111}
1112
1113/// An update related to events happened in a room.
1114#[derive(Debug, Clone)]
1115pub enum RoomEventCacheUpdate {
1116    /// The fully read marker has moved to a different event.
1117    MoveReadMarkerTo {
1118        /// Event at which the read marker is now pointing.
1119        event_id: OwnedEventId,
1120    },
1121
1122    /// The members have changed.
1123    UpdateMembers {
1124        /// Collection of ambiguity changes that room member events trigger.
1125        ///
1126        /// This is a map of event ID of the `m.room.member` event to the
1127        /// details of the ambiguity change.
1128        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1129    },
1130
1131    /// The room has received updates for the timeline as _diffs_.
1132    UpdateTimelineEvents {
1133        /// Diffs to apply to the timeline.
1134        diffs: Vec<VectorDiff<TimelineEvent>>,
1135
1136        /// Where the diffs are coming from.
1137        origin: EventsOrigin,
1138    },
1139
1140    /// The room has received new ephemeral events.
1141    AddEphemeralEvents {
1142        /// XXX: this is temporary, until read receipts are handled in the event
1143        /// cache
1144        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1145    },
1146}
1147
1148/// Indicate where events are coming from.
1149#[derive(Debug, Clone)]
1150pub enum EventsOrigin {
1151    /// Events are coming from a sync.
1152    Sync,
1153
1154    /// Events are coming from pagination.
1155    Pagination,
1156
1157    /// The cause of the change is purely internal to the cache.
1158    Cache,
1159}
1160
1161#[cfg(test)]
1162mod tests {
1163    use std::{ops::Not, sync::Arc, time::Duration};
1164
1165    use assert_matches::assert_matches;
1166    use futures_util::FutureExt as _;
1167    use matrix_sdk_base::{
1168        RoomState,
1169        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1170        sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1171    };
1172    use matrix_sdk_test::{
1173        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
1174    };
1175    use ruma::{event_id, room_id, serde::Raw, user_id};
1176    use serde_json::json;
1177    use tokio::time::sleep;
1178
1179    use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1180    use crate::test_utils::{
1181        assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1182    };
1183
1184    #[async_test]
1185    async fn test_must_explicitly_subscribe() {
1186        let client = logged_in_client(None).await;
1187
1188        let event_cache = client.event_cache();
1189
1190        // If I create a room event subscriber for a room before subscribing the event
1191        // cache,
1192        let room_id = room_id!("!omelette:fromage.fr");
1193        let result = event_cache.for_room(room_id).await;
1194
1195        // Then it fails, because one must explicitly call `.subscribe()` on the event
1196        // cache.
1197        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1198    }
1199
1200    #[async_test]
1201    async fn test_uniq_read_marker() {
1202        let client = logged_in_client(None).await;
1203        let room_id = room_id!("!galette:saucisse.bzh");
1204        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1205
1206        let event_cache = client.event_cache();
1207
1208        event_cache.subscribe().unwrap();
1209
1210        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1211        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1212        let (events, mut stream) = room_event_cache.subscribe().await;
1213
1214        assert!(events.is_empty());
1215
1216        // When sending multiple times the same read marker event,…
1217        let read_marker_event = Raw::from_json_string(
1218            json!({
1219                "content": {
1220                    "event_id": "$crepe:saucisse.bzh"
1221                },
1222                "room_id": "!galette:saucisse.bzh",
1223                "type": "m.fully_read"
1224            })
1225            .to_string(),
1226        )
1227        .unwrap();
1228        let account_data = vec![read_marker_event; 100];
1229
1230        room_event_cache
1231            .inner
1232            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1233            .await
1234            .unwrap();
1235
1236        // … there's only one read marker update.
1237        assert_matches!(
1238            stream.recv().await.unwrap(),
1239            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1240        );
1241
1242        assert!(stream.recv().now_or_never().is_none());
1243
1244        // None, because an account data doesn't trigger a generic update.
1245        assert!(generic_stream.recv().now_or_never().is_none());
1246    }
1247
1248    #[async_test]
1249    async fn test_get_event_by_id() {
1250        let client = logged_in_client(None).await;
1251        let room_id1 = room_id!("!galette:saucisse.bzh");
1252        let room_id2 = room_id!("!crepe:saucisse.bzh");
1253
1254        client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1255        client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1256
1257        let event_cache = client.event_cache();
1258        event_cache.subscribe().unwrap();
1259
1260        // Insert two rooms with a few events.
1261        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1262
1263        let eid1 = event_id!("$1");
1264        let eid2 = event_id!("$2");
1265        let eid3 = event_id!("$3");
1266
1267        let joined_room_update1 = JoinedRoomUpdate {
1268            timeline: Timeline {
1269                events: vec![
1270                    f.text_msg("hey").event_id(eid1).into(),
1271                    f.text_msg("you").event_id(eid2).into(),
1272                ],
1273                ..Default::default()
1274            },
1275            ..Default::default()
1276        };
1277
1278        let joined_room_update2 = JoinedRoomUpdate {
1279            timeline: Timeline {
1280                events: vec![f.text_msg("bjr").event_id(eid3).into()],
1281                ..Default::default()
1282            },
1283            ..Default::default()
1284        };
1285
1286        let mut updates = RoomUpdates::default();
1287        updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1288        updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1289
1290        // Have the event cache handle them.
1291        event_cache.inner.handle_room_updates(updates).await.unwrap();
1292
1293        // We can find the events in a single room.
1294        let room1 = client.get_room(room_id1).unwrap();
1295
1296        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1297
1298        let found1 = room_event_cache.find_event(eid1).await.unwrap();
1299        assert_event_matches_msg(&found1, "hey");
1300
1301        let found2 = room_event_cache.find_event(eid2).await.unwrap();
1302        assert_event_matches_msg(&found2, "you");
1303
1304        // Retrieving the event with id3 from the room which doesn't contain it will
1305        // fail…
1306        assert!(room_event_cache.find_event(eid3).await.is_none());
1307    }
1308
1309    #[async_test]
1310    async fn test_save_event() {
1311        let client = logged_in_client(None).await;
1312        let room_id = room_id!("!galette:saucisse.bzh");
1313
1314        let event_cache = client.event_cache();
1315        event_cache.subscribe().unwrap();
1316
1317        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1318        let event_id = event_id!("$1");
1319
1320        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1321        let room = client.get_room(room_id).unwrap();
1322
1323        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1324        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1325
1326        // Retrieving the event at the room-wide cache works.
1327        assert!(room_event_cache.find_event(event_id).await.is_some());
1328    }
1329
1330    #[async_test]
1331    async fn test_generic_update_when_loading_rooms() {
1332        // Create 2 rooms. One of them has data in the event cache storage.
1333        let user = user_id!("@mnt_io:matrix.org");
1334        let client = logged_in_client(None).await;
1335        let room_id_0 = room_id!("!raclette:patate.ch");
1336        let room_id_1 = room_id!("!fondue:patate.ch");
1337
1338        let event_factory = EventFactory::new().room(room_id_0).sender(user);
1339
1340        let event_cache = client.event_cache();
1341        event_cache.subscribe().unwrap();
1342
1343        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1344        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1345
1346        client
1347            .event_cache_store()
1348            .lock()
1349            .await
1350            .unwrap()
1351            .handle_linked_chunk_updates(
1352                LinkedChunkId::Room(room_id_0),
1353                vec![
1354                    // Non-empty items chunk.
1355                    Update::NewItemsChunk {
1356                        previous: None,
1357                        new: ChunkIdentifier::new(0),
1358                        next: None,
1359                    },
1360                    Update::PushItems {
1361                        at: Position::new(ChunkIdentifier::new(0), 0),
1362                        items: vec![
1363                            event_factory
1364                                .text_msg("hello")
1365                                .sender(user)
1366                                .event_id(event_id!("$ev0"))
1367                                .into_event(),
1368                        ],
1369                    },
1370                ],
1371            )
1372            .await
1373            .unwrap();
1374
1375        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1376
1377        // Room 0 has initial data, so it must trigger a generic update.
1378        {
1379            let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1380
1381            assert_matches!(
1382                generic_stream.recv().await,
1383                Ok(RoomEventCacheGenericUpdate { room_id }) => {
1384                    assert_eq!(room_id, room_id_0);
1385                }
1386            );
1387        }
1388
1389        // Room 1 has NO initial data, so nothing should happen.
1390        {
1391            let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1392
1393            assert!(generic_stream.recv().now_or_never().is_none());
1394        }
1395    }
1396
1397    #[async_test]
1398    async fn test_generic_update_when_paginating_room() {
1399        // Create 1 room, with 4 chunks in the event cache storage.
1400        let user = user_id!("@mnt_io:matrix.org");
1401        let client = logged_in_client(None).await;
1402        let room_id = room_id!("!raclette:patate.ch");
1403
1404        let event_factory = EventFactory::new().room(room_id).sender(user);
1405
1406        let event_cache = client.event_cache();
1407        event_cache.subscribe().unwrap();
1408
1409        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1410
1411        client
1412            .event_cache_store()
1413            .lock()
1414            .await
1415            .unwrap()
1416            .handle_linked_chunk_updates(
1417                LinkedChunkId::Room(room_id),
1418                vec![
1419                    // Empty chunk.
1420                    Update::NewItemsChunk {
1421                        previous: None,
1422                        new: ChunkIdentifier::new(0),
1423                        next: None,
1424                    },
1425                    // Empty chunk.
1426                    Update::NewItemsChunk {
1427                        previous: Some(ChunkIdentifier::new(0)),
1428                        new: ChunkIdentifier::new(1),
1429                        next: None,
1430                    },
1431                    // Non-empty items chunk.
1432                    Update::NewItemsChunk {
1433                        previous: Some(ChunkIdentifier::new(1)),
1434                        new: ChunkIdentifier::new(2),
1435                        next: None,
1436                    },
1437                    Update::PushItems {
1438                        at: Position::new(ChunkIdentifier::new(2), 0),
1439                        items: vec![
1440                            event_factory
1441                                .text_msg("hello")
1442                                .sender(user)
1443                                .event_id(event_id!("$ev0"))
1444                                .into_event(),
1445                        ],
1446                    },
1447                    // Non-empty items chunk.
1448                    Update::NewItemsChunk {
1449                        previous: Some(ChunkIdentifier::new(2)),
1450                        new: ChunkIdentifier::new(3),
1451                        next: None,
1452                    },
1453                    Update::PushItems {
1454                        at: Position::new(ChunkIdentifier::new(3), 0),
1455                        items: vec![
1456                            event_factory
1457                                .text_msg("world")
1458                                .sender(user)
1459                                .event_id(event_id!("$ev1"))
1460                                .into_event(),
1461                        ],
1462                    },
1463                ],
1464            )
1465            .await
1466            .unwrap();
1467
1468        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1469
1470        // Room is initialised, it gets one event in the timeline.
1471        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1472
1473        assert_matches!(
1474            generic_stream.recv().await,
1475            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1476                assert_eq!(room_id, expected_room_id);
1477            }
1478        );
1479
1480        let pagination = room_event_cache.pagination();
1481
1482        // Paginate, it gets one new event in the timeline.
1483        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1484
1485        assert_eq!(pagination_outcome.events.len(), 1);
1486        assert!(pagination_outcome.reached_start.not());
1487        assert_matches!(
1488            generic_stream.recv().await,
1489            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1490                assert_eq!(room_id, expected_room_id);
1491            }
1492        );
1493
1494        // Paginate, it gets zero new event in the timeline.
1495        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1496
1497        assert!(pagination_outcome.events.is_empty());
1498        assert!(pagination_outcome.reached_start.not());
1499        assert!(generic_stream.recv().now_or_never().is_none());
1500
1501        // Paginate once more. Just checking our scenario is correct.
1502        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1503
1504        assert!(pagination_outcome.reached_start);
1505        assert!(generic_stream.recv().now_or_never().is_none());
1506    }
1507
1508    #[async_test]
1509    async fn test_for_room_when_room_is_not_found() {
1510        let client = logged_in_client(None).await;
1511        let room_id = room_id!("!raclette:patate.ch");
1512
1513        let event_cache = client.event_cache();
1514        event_cache.subscribe().unwrap();
1515
1516        // Room doesn't exist. It returns an error.
1517        assert_matches!(
1518            event_cache.for_room(room_id).await,
1519            Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1520                assert_eq!(room_id, not_found_room_id);
1521            }
1522        );
1523
1524        // Now create the room.
1525        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1526
1527        // Room exists. Everything fine.
1528        assert!(event_cache.for_room(room_id).await.is_ok());
1529    }
1530
1531    /// Test that the event cache does not create reference cycles or tasks that
1532    /// retain its reference indefinitely, preventing it from being deallocated.
1533    #[cfg(not(target_family = "wasm"))]
1534    #[async_test]
1535    async fn test_no_refcycle_event_cache_tasks() {
1536        let client = MockClientBuilder::new(None).build().await;
1537
1538        // Wait for the init tasks to die.
1539        sleep(Duration::from_secs(1)).await;
1540
1541        let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1542        assert_eq!(event_cache_weak.strong_count(), 1);
1543
1544        {
1545            let room_id = room_id!("!room:example.org");
1546
1547            // Have the client know the room.
1548            let response = SyncResponseBuilder::default()
1549                .add_joined_room(JoinedRoomBuilder::new(room_id))
1550                .build_sync_response();
1551            client.inner.base_client.receive_sync_response(response).await.unwrap();
1552
1553            client.event_cache().subscribe().unwrap();
1554
1555            let (_room_event_cache, _drop_handles) =
1556                client.get_room(room_id).unwrap().event_cache().await.unwrap();
1557        }
1558
1559        drop(client);
1560
1561        // Give a bit of time for background tasks to die.
1562        sleep(Duration::from_secs(1)).await;
1563
1564        // No strong counts should exist now that the Client has been dropped.
1565        assert_eq!(
1566            event_cache_weak.strong_count(),
1567            0,
1568            "Too many strong references to the event cache {}",
1569            event_cache_weak.strong_count()
1570        );
1571    }
1572}