matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::{BTreeMap, HashMap},
32    fmt,
33    sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40    ThreadingSupport,
41    cross_process_lock::CrossProcessLockError,
42    deserialized_responses::{AmbiguityChange, TimelineEvent},
43    event_cache::{
44        Gap,
45        store::{EventCacheStoreError, EventCacheStoreLock},
46    },
47    executor::AbortOnDrop,
48    linked_chunk::{self, OwnedLinkedChunkId, lazy_loader::LazyLoaderError},
49    serde_helpers::extract_thread_root_from_content,
50    sync::RoomUpdates,
51    timer,
52};
53use matrix_sdk_common::executor::{JoinHandle, spawn};
54use room::RoomEventCacheState;
55use ruma::{
56    OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, events::AnySyncEphemeralRoomEvent,
57    serde::Raw,
58};
59use tokio::{
60    select,
61    sync::{
62        Mutex, RwLock,
63        broadcast::{Receiver, Sender, channel, error::RecvError},
64        mpsc,
65    },
66};
67use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
68
69use crate::{
70    Client,
71    client::WeakClient,
72    send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
73};
74
75mod deduplicator;
76mod pagination;
77mod room;
78
79pub use pagination::{RoomPagination, RoomPaginationStatus};
80pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
81
82/// An error observed in the [`EventCache`].
83#[derive(thiserror::Error, Debug)]
84pub enum EventCacheError {
85    /// The [`EventCache`] instance hasn't been initialized with
86    /// [`EventCache::subscribe`]
87    #[error(
88        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
89    )]
90    NotSubscribedYet,
91
92    /// Room is not found.
93    #[error("Room `{room_id}` is not found.")]
94    RoomNotFound {
95        /// The ID of the room not being found.
96        room_id: OwnedRoomId,
97    },
98
99    /// An error has been observed while back-paginating.
100    #[error(transparent)]
101    BackpaginationError(Box<crate::Error>),
102
103    /// Back-pagination was already happening in a given room, where we tried to
104    /// back-paginate again.
105    #[error("We were already back-paginating.")]
106    AlreadyBackpaginating,
107
108    /// An error happening when interacting with storage.
109    #[error(transparent)]
110    Storage(#[from] EventCacheStoreError),
111
112    /// An error happening when attempting to (cross-process) lock storage.
113    #[error(transparent)]
114    LockingStorage(#[from] CrossProcessLockError),
115
116    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
117    /// to. It's possible this weak reference points to nothing anymore, at
118    /// times where we try to use the client.
119    #[error("The owning client of the event cache has been dropped.")]
120    ClientDropped,
121
122    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
123    /// loader.
124    ///
125    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
126    #[error(transparent)]
127    LinkedChunkLoader(#[from] LazyLoaderError),
128
129    /// An error happened when reading the metadata of a linked chunk, upon
130    /// reload.
131    #[error("the linked chunk metadata is invalid: {details}")]
132    InvalidLinkedChunkMetadata {
133        /// A string containing details about the error.
134        details: String,
135    },
136}
137
138/// A result using the [`EventCacheError`].
139pub type Result<T> = std::result::Result<T, EventCacheError>;
140
141/// Hold handles to the tasks spawn by a [`EventCache`].
142pub struct EventCacheDropHandles {
143    /// Task that listens to room updates.
144    listen_updates_task: JoinHandle<()>,
145
146    /// Task that listens to updates to the user's ignored list.
147    ignore_user_list_update_task: JoinHandle<()>,
148
149    /// The task used to automatically shrink the linked chunks.
150    auto_shrink_linked_chunk_task: JoinHandle<()>,
151}
152
153impl fmt::Debug for EventCacheDropHandles {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
156    }
157}
158
159impl Drop for EventCacheDropHandles {
160    fn drop(&mut self) {
161        self.listen_updates_task.abort();
162        self.ignore_user_list_update_task.abort();
163        self.auto_shrink_linked_chunk_task.abort();
164    }
165}
166
167/// An event cache, providing lots of useful functionality for clients.
168///
169/// Cloning is shallow, and thus is cheap to do.
170///
171/// See also the module-level comment.
172#[derive(Clone)]
173pub struct EventCache {
174    /// Reference to the inner cache.
175    inner: Arc<EventCacheInner>,
176}
177
178impl fmt::Debug for EventCache {
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        f.debug_struct("EventCache").finish_non_exhaustive()
181    }
182}
183
184impl EventCache {
185    /// Create a new [`EventCache`] for the given client.
186    pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
187        let (generic_update_sender, _) = channel(32);
188        let (linked_chunk_update_sender, _) = channel(32);
189
190        let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
191        let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
192            client.clone(),
193            linked_chunk_update_sender.clone(),
194            thread_subscriber_sender,
195        )));
196
197        #[cfg(feature = "experimental-search")]
198        let search_indexing_task = AbortOnDrop::new(spawn(Self::search_indexing_task(
199            client.clone(),
200            linked_chunk_update_sender.clone(),
201        )));
202
203        Self {
204            inner: Arc::new(EventCacheInner {
205                client,
206                store: event_cache_store,
207                multiple_room_updates_lock: Default::default(),
208                by_room: Default::default(),
209                drop_handles: Default::default(),
210                auto_shrink_sender: Default::default(),
211                generic_update_sender,
212                linked_chunk_update_sender,
213                _thread_subscriber_task: thread_subscriber_task,
214                #[cfg(feature = "experimental-search")]
215                _search_indexing_task: search_indexing_task,
216                thread_subscriber_receiver,
217            }),
218        }
219    }
220
221    /// Subscribes to updates that a thread subscription has been sent.
222    ///
223    /// For testing purposes only.
224    #[doc(hidden)]
225    pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
226        self.inner.thread_subscriber_receiver.resubscribe()
227    }
228
229    /// Starts subscribing the [`EventCache`] to sync responses, if not done
230    /// before.
231    ///
232    /// Re-running this has no effect if we already subscribed before, and is
233    /// cheap.
234    pub fn subscribe(&self) -> Result<()> {
235        let client = self.inner.client()?;
236
237        // Initialize the drop handles.
238        let _ = self.inner.drop_handles.get_or_init(|| {
239            // Spawn the task that will listen to all the room updates at once.
240            let listen_updates_task = spawn(Self::listen_task(
241                self.inner.clone(),
242                client.subscribe_to_all_room_updates(),
243            ));
244
245            let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
246                self.inner.clone(),
247                client.subscribe_to_ignore_user_list_changes(),
248            ));
249
250            let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
251
252            // Force-initialize the sender in the [`RoomEventCacheInner`].
253            self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
254
255            let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task(
256                Arc::downgrade(&self.inner),
257                auto_shrink_receiver,
258            ));
259
260            Arc::new(EventCacheDropHandles {
261                listen_updates_task,
262                ignore_user_list_update_task,
263                auto_shrink_linked_chunk_task,
264            })
265        });
266
267        Ok(())
268    }
269
270    #[instrument(skip_all)]
271    async fn ignore_user_list_update_task(
272        inner: Arc<EventCacheInner>,
273        mut ignore_user_list_stream: Subscriber<Vec<String>>,
274    ) {
275        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
276        span.follows_from(Span::current());
277
278        async move {
279            while ignore_user_list_stream.next().await.is_some() {
280                info!("Received an ignore user list change");
281                if let Err(err) = inner.clear_all_rooms().await {
282                    error!("when clearing room storage after ignore user list change: {err}");
283                }
284            }
285            info!("Ignore user list stream has closed");
286        }
287        .instrument(span)
288        .await;
289    }
290
291    /// For benchmarking purposes only.
292    #[doc(hidden)]
293    pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
294        self.inner.handle_room_updates(updates).await
295    }
296
297    #[instrument(skip_all)]
298    async fn listen_task(
299        inner: Arc<EventCacheInner>,
300        mut room_updates_feed: Receiver<RoomUpdates>,
301    ) {
302        trace!("Spawning the listen task");
303        loop {
304            match room_updates_feed.recv().await {
305                Ok(updates) => {
306                    trace!("Receiving `RoomUpdates`");
307
308                    if let Err(err) = inner.handle_room_updates(updates).await {
309                        match err {
310                            EventCacheError::ClientDropped => {
311                                // The client has dropped, exit the listen task.
312                                info!(
313                                    "Closing the event cache global listen task because client dropped"
314                                );
315                                break;
316                            }
317                            err => {
318                                error!("Error when handling room updates: {err}");
319                            }
320                        }
321                    }
322                }
323
324                Err(RecvError::Lagged(num_skipped)) => {
325                    // Forget everything we know; we could have missed events, and we have
326                    // no way to reconcile at the moment!
327                    // TODO: implement Smart Matching™,
328                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
329                    if let Err(err) = inner.clear_all_rooms().await {
330                        error!("when clearing storage after lag in listen_task: {err}");
331                    }
332                }
333
334                Err(RecvError::Closed) => {
335                    // The sender has shut down, exit.
336                    info!("Closing the event cache global listen task because receiver closed");
337                    break;
338                }
339            }
340        }
341    }
342
343    /// Spawns the task that will listen to auto-shrink notifications.
344    ///
345    /// The auto-shrink mechanism works this way:
346    ///
347    /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
348    ///   increment the active number of subscribers to that room, aka
349    ///   [`RoomEventCacheState::subscriber_count`].
350    /// - When that subscriber is dropped, it will decrement that count; and
351    ///   notify the task below if it reached 0.
352    /// - The task spawned here, owned by the [`EventCacheInner`], will listen
353    ///   to such notifications that a room may be shrunk. It will attempt an
354    ///   auto-shrink, by letting the inner state decide whether this is a good
355    ///   time to do so (new subscribers might have spawned in the meanwhile).
356    #[instrument(skip_all)]
357    async fn auto_shrink_linked_chunk_task(
358        inner: Weak<EventCacheInner>,
359        mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
360    ) {
361        while let Some(room_id) = rx.recv().await {
362            trace!(for_room = %room_id, "received notification to shrink");
363
364            let Some(inner) = inner.upgrade() else {
365                return;
366            };
367
368            let room = match inner.for_room(&room_id).await {
369                Ok(room) => room,
370                Err(err) => {
371                    warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
372                    continue;
373                }
374            };
375
376            trace!("waiting for state lock…");
377            let mut state = room.inner.state.write().await;
378
379            match state.auto_shrink_if_no_subscribers().await {
380                Ok(diffs) => {
381                    if let Some(diffs) = diffs {
382                        // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
383                        // subscribers, right? RIGHT? Especially because the state is guarded behind
384                        // a lock.
385                        //
386                        // However, better safe than sorry, and it's cheap to send an update here,
387                        // so let's do it!
388                        if !diffs.is_empty() {
389                            let _ = room.inner.sender.send(
390                                RoomEventCacheUpdate::UpdateTimelineEvents {
391                                    diffs,
392                                    origin: EventsOrigin::Cache,
393                                },
394                            );
395                        }
396                    } else {
397                        debug!("auto-shrinking didn't happen");
398                    }
399                }
400
401                Err(err) => {
402                    // There's not much we can do here, unfortunately.
403                    warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
404                }
405            }
406        }
407
408        info!("Auto-shrink linked chunk task has been closed, exiting");
409    }
410
411    /// Check whether [`EventCache::subscribe`] has been called.
412    pub fn has_subscribed(&self) -> bool {
413        self.inner.drop_handles.get().is_some()
414    }
415
416    /// Return a room-specific view over the [`EventCache`].
417    pub(crate) async fn for_room(
418        &self,
419        room_id: &RoomId,
420    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
421        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
422            return Err(EventCacheError::NotSubscribedYet);
423        };
424
425        let room = self.inner.for_room(room_id).await?;
426
427        Ok((room, drop_handles))
428    }
429
430    /// Cleanly clear all the rooms' event caches.
431    ///
432    /// This will notify any live observers that the room has been cleared.
433    pub async fn clear_all_rooms(&self) -> Result<()> {
434        self.inner.clear_all_rooms().await
435    }
436
437    /// Subscribe to room _generic_ updates.
438    ///
439    /// If one wants to listen what has changed in a specific room, the
440    /// [`RoomEventCache::subscribe`] is recommended. However, the
441    /// [`RoomEventCacheSubscriber`] type triggers side-effects.
442    ///
443    /// If one wants to get a high-overview, generic, updates for rooms, and
444    /// without side-effects, this method is recommended. Also, dropping the
445    /// receiver of this channel will not trigger any side-effect.
446    pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
447        self.inner.generic_update_sender.subscribe()
448    }
449
450    /// React to a given linked chunk update by subscribing the user to a
451    /// thread, if needs be (when the user got mentioned in a thread reply, for
452    /// a thread they were not subscribed to).
453    ///
454    /// Returns a boolean indicating whether the task should keep on running or
455    /// not.
456    #[instrument(skip(client, thread_subscriber_sender))]
457    async fn handle_thread_subscriber_linked_chunk_update(
458        client: &WeakClient,
459        thread_subscriber_sender: &Sender<()>,
460        up: RoomEventCacheLinkedChunkUpdate,
461    ) -> bool {
462        let Some(client) = client.get() else {
463            // Client shutting down.
464            debug!("Client is shutting down, exiting thread subscriber task");
465            return false;
466        };
467
468        let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
469            trace!("received an update for a non-thread linked chunk, ignoring");
470            return true;
471        };
472
473        let Some(room) = client.get_room(room_id) else {
474            warn!(%room_id, "unknown room");
475            return true;
476        };
477
478        let thread_root = thread_root.clone();
479
480        let mut new_events = up.events().peekable();
481
482        if new_events.peek().is_none() {
483            // No new events, nothing to do.
484            return true;
485        }
486
487        // This `PushContext` is going to be used to compute whether an in-thread event
488        // would trigger a mention.
489        //
490        // Of course, we're not interested in an in-thread event causing a mention,
491        // because it's part of a thread we've subscribed to. So the
492        // `PushContext` must not include the check for thread subscriptions (otherwise
493        // it would be impossible to subscribe to new threads).
494
495        let with_thread_subscriptions = false;
496
497        let Some(push_context) = room
498            .push_context_internal(with_thread_subscriptions)
499            .await
500            .inspect_err(|err| {
501                warn!("Failed to get push context for threads: {err}");
502            })
503            .ok()
504            .flatten()
505        else {
506            warn!("Missing push context for thread subscriptions.");
507            return true;
508        };
509
510        let mut subscribe_up_to = None;
511
512        // Find if there's an event that would trigger a mention for the current
513        // user, iterating from the end of the new events towards the oldest, so we can
514        // find the most recent event to subscribe to.
515        for ev in new_events.rev() {
516            if push_context
517                .for_event(ev.raw())
518                .await
519                .into_iter()
520                .any(|action| action.should_notify())
521            {
522                let Some(event_id) = ev.event_id() else {
523                    // Shouldn't happen.
524                    continue;
525                };
526                subscribe_up_to = Some(event_id);
527                break;
528            }
529        }
530
531        // And if we've found such a mention, subscribe to the thread up to this
532        // event.
533        if let Some(event_id) = subscribe_up_to {
534            trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
535            if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
536                warn!(%err, "Failed to subscribe to thread");
537            } else {
538                let _ = thread_subscriber_sender.send(());
539            }
540        }
541
542        true
543    }
544
545    /// React to a given send queue update by subscribing the user to a
546    /// thread, if needs be (when the user sent an event in a thread they were
547    /// not subscribed to).
548    ///
549    /// Returns a boolean indicating whether the task should keep on running or
550    /// not.
551    #[instrument(skip(client, thread_subscriber_sender))]
552    async fn handle_thread_subscriber_send_queue_update(
553        client: &WeakClient,
554        thread_subscriber_sender: &Sender<()>,
555        events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
556        up: SendQueueUpdate,
557    ) -> bool {
558        let Some(client) = client.get() else {
559            // Client shutting down.
560            debug!("Client is shutting down, exiting thread subscriber task");
561            return false;
562        };
563
564        let room_id = up.room_id;
565        let Some(room) = client.get_room(&room_id) else {
566            warn!(%room_id, "unknown room");
567            return true;
568        };
569
570        let (thread_root, subscribe_up_to) = match up.update {
571            RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
572                match local_echo.content {
573                    LocalEchoContent::Event { serialized_event, .. } => {
574                        if let Some(thread_root) =
575                            extract_thread_root_from_content(serialized_event.into_raw().0)
576                        {
577                            events_being_sent.insert(local_echo.transaction_id, thread_root);
578                        }
579                    }
580                    LocalEchoContent::React { .. } => {
581                        // Nothing to do, reactions don't count as a thread
582                        // subscription.
583                    }
584                }
585                return true;
586            }
587
588            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
589                events_being_sent.remove(&transaction_id);
590                return true;
591            }
592
593            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
594                if let Some(thread_root) =
595                    extract_thread_root_from_content(new_content.into_raw().0)
596                {
597                    events_being_sent.insert(transaction_id, thread_root);
598                } else {
599                    // It could be that the event isn't part of a thread anymore; handle that by
600                    // removing the pending transaction id.
601                    events_being_sent.remove(&transaction_id);
602                }
603                return true;
604            }
605
606            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
607                if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
608                    (thread_root, event_id)
609                } else {
610                    // We don't know about the event that has been sent, so ignore it.
611                    trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
612                    return true;
613                }
614            }
615
616            RoomSendQueueUpdate::SendError { .. }
617            | RoomSendQueueUpdate::RetryEvent { .. }
618            | RoomSendQueueUpdate::MediaUpload { .. } => {
619                // Nothing to do for these bad boys.
620                return true;
621            }
622        };
623
624        // And if we've found such a mention, subscribe to the thread up to this event.
625        trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
626        if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
627        {
628            warn!(%err, "Failed to subscribe to thread");
629        } else {
630            let _ = thread_subscriber_sender.send(());
631        }
632
633        true
634    }
635
636    #[instrument(skip_all)]
637    async fn thread_subscriber_task(
638        client: WeakClient,
639        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
640        thread_subscriber_sender: Sender<()>,
641    ) {
642        let mut send_q_rx = if let Some(client) = client.get() {
643            if !client.enabled_thread_subscriptions() {
644                trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
645                return;
646            }
647
648            client.send_queue().subscribe()
649        } else {
650            trace!("Client is shutting down, not spawning thread subscriber task");
651            return;
652        };
653
654        let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
655
656        // A mapping of local echoes (events being sent), to their thread root, if
657        // they're in an in-thread reply.
658        //
659        // Entirely managed by `handle_thread_subscriber_send_queue_update`.
660        let mut events_being_sent = HashMap::new();
661
662        loop {
663            select! {
664                res = send_q_rx.recv() => {
665                    match res {
666                        Ok(up) => {
667                            if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
668                                break;
669                            }
670                        }
671                        Err(RecvError::Closed) => {
672                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
673                            break;
674                        }
675                        Err(RecvError::Lagged(num_skipped)) => {
676                            warn!(num_skipped, "Lagged behind linked chunk updates");
677                        }
678                    }
679                }
680
681                res = linked_chunk_rx.recv() => {
682                    match res {
683                        Ok(up) => {
684                            if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
685                                break;
686                            }
687                        }
688                        Err(RecvError::Closed) => {
689                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
690                            break;
691                        }
692                        Err(RecvError::Lagged(num_skipped)) => {
693                            warn!(num_skipped, "Lagged behind linked chunk updates");
694                        }
695                    }
696                }
697            }
698        }
699    }
700
701    /// Takes a [`TimelineEvent`] and passes it to the [`RoomIndex`] of the
702    /// given room which will add/remove/edit an event in the index based on
703    /// the event type.
704    #[cfg(feature = "experimental-search")]
705    #[instrument(skip_all)]
706    async fn search_indexing_task(
707        client: WeakClient,
708        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
709    ) {
710        let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
711
712        loop {
713            match linked_chunk_update_receiver.recv().await {
714                Ok(room_ec_lc_update) => {
715                    let OwnedLinkedChunkId::Room(room_id) =
716                        room_ec_lc_update.linked_chunk_id.clone()
717                    else {
718                        trace!("Received non-room updates, ignoring.");
719                        continue;
720                    };
721
722                    let mut timeline_events = room_ec_lc_update.events().peekable();
723
724                    if timeline_events.peek().is_none() {
725                        continue;
726                    }
727
728                    let Some(client) = client.get() else {
729                        trace!("Client is shutting down, not spawning thread subscriber task");
730                        return;
731                    };
732
733                    let maybe_room_cache = client.event_cache().for_room(&room_id).await;
734                    let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
735                        warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
736                        continue;
737                    };
738
739                    let maybe_room = client.get_room(&room_id);
740                    let Some(room) = maybe_room else {
741                        warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
742                        continue;
743                    };
744                    let redaction_rules =
745                        room.clone_info().room_version_rules_or_default().redaction;
746
747                    let mut search_index_guard = client.search_index().lock().await;
748
749                    if let Err(err) = search_index_guard
750                        .bulk_handle_timeline_event(
751                            timeline_events,
752                            &room_cache,
753                            &room_id,
754                            &redaction_rules,
755                        )
756                        .await
757                    {
758                        error!("Failed to handle events for indexing: {err}")
759                    }
760                }
761                Err(RecvError::Closed) => {
762                    debug!(
763                        "Linked chunk update channel has been closed, exiting thread subscriber task"
764                    );
765                    break;
766                }
767                Err(RecvError::Lagged(num_skipped)) => {
768                    warn!(num_skipped, "Lagged behind linked chunk updates");
769                }
770            }
771        }
772    }
773}
774
775struct EventCacheInner {
776    /// A weak reference to the inner client, useful when trying to get a handle
777    /// on the owning client.
778    client: WeakClient,
779
780    /// Reference to the underlying store.
781    store: EventCacheStoreLock,
782
783    /// A lock used when many rooms must be updated at once.
784    ///
785    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
786    /// ensure that multiple updates will be applied in the correct order, which
787    /// is enforced by taking this lock when handling an update.
788    // TODO: that's the place to add a cross-process lock!
789    multiple_room_updates_lock: Mutex<()>,
790
791    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
792    by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
793
794    /// Handles to keep alive the task listening to updates.
795    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
796
797    /// A sender for notifications that a room *may* need to be auto-shrunk.
798    ///
799    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
800    /// instance.
801    ///
802    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
803    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
804
805    /// A sender for room generic update.
806    ///
807    /// See doc comment of [`RoomEventCacheGenericUpdate`] and
808    /// [`EventCache::subscribe_to_room_generic_updates`].
809    generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
810
811    /// A sender for a persisted linked chunk update.
812    ///
813    /// This is used to notify that some linked chunk has persisted some updates
814    /// to a store, during sync or a back-pagination of *any* linked chunk.
815    /// This can be used by observers to look for new events.
816    ///
817    /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
818    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
819
820    /// A background task listening to room and send queue updates, and
821    /// automatically subscribing the user to threads when needed, based on
822    /// the semantics of MSC4306.
823    ///
824    /// One important constraint is that there is only one such task per
825    /// [`EventCache`], so it does listen to *all* rooms at the same time.
826    _thread_subscriber_task: AbortOnDrop<()>,
827
828    /// A background task listening to room updates, and
829    /// automatically handling search index operations add/remove/edit
830    /// depending on the event type.
831    ///
832    /// One important constraint is that there is only one such task per
833    /// [`EventCache`], so it does listen to *all* rooms at the same time.
834    #[cfg(feature = "experimental-search")]
835    _search_indexing_task: AbortOnDrop<()>,
836
837    /// A test helper receiver that will be emitted every time the thread
838    /// subscriber task subscribed to a new thread.
839    ///
840    /// This is helpful for tests to coordinate that a new thread subscription
841    /// has been sent or not.
842    thread_subscriber_receiver: Receiver<()>,
843}
844
845type AutoShrinkChannelPayload = OwnedRoomId;
846
847impl EventCacheInner {
848    fn client(&self) -> Result<Client> {
849        self.client.get().ok_or(EventCacheError::ClientDropped)
850    }
851
852    /// Clears all the room's data.
853    async fn clear_all_rooms(&self) -> Result<()> {
854        // Okay, here's where things get complicated.
855        //
856        // On the one hand, `by_room` may include storage for *some* rooms that we know
857        // about, but not *all* of them. Any room that hasn't been loaded in the
858        // client, or touched by a sync, will remain unloaded in memory, so it
859        // will be missing from `self.by_room`. As a result, we need to make
860        // sure that we're hitting the storage backend to *really* clear all the
861        // rooms, including those that haven't been loaded yet.
862        //
863        // On the other hand, one must NOT clear the `by_room` map, because if someone
864        // subscribed to a room update, they would never get any new update for
865        // that room, since re-creating the `RoomEventCache` would create a new,
866        // unrelated sender.
867        //
868        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
869        // store backend.
870        //
871        // As a result, for a short while, the in-memory linked chunks
872        // will be desynchronized from the storage. We need to be careful then. During
873        // that short while, we don't want *anyone* to touch the linked chunk
874        // (be it in memory or in the storage).
875        //
876        // And since that requirement applies to *any* room in `by_room` at the same
877        // time, we'll have to take the locks for *all* the live rooms, so as to
878        // properly clear the underlying storage.
879        //
880        // At this point, you might be scared about the potential for deadlocking. I am
881        // as well, but I'm convinced we're fine:
882        // 1. the lock for `by_room` is usually held only for a short while, and
883        //    independently of the other two kinds.
884        // 2. the state may acquire the store cross-process lock internally, but only
885        //    while the state's methods are called (so it's always transient). As a
886        //    result, as soon as we've acquired the state locks, the store lock ought to
887        //    be free.
888        // 3. The store lock is held explicitly only in a small scoped area below.
889        // 4. Then the store lock will be held internally when calling `reset()`, but at
890        //    this point it's only held for a short while each time, so rooms will take
891        //    turn to acquire it.
892
893        let rooms = self.by_room.write().await;
894
895        // Collect all the rooms' state locks, first: we can clear the storage only when
896        // nobody will touch it at the same time.
897        let room_locks = join_all(
898            rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
899        )
900        .await;
901
902        // Clear the storage for all the rooms, using the storage facility.
903        self.store.lock().await?.clear_all_linked_chunks().await?;
904
905        // At this point, all the in-memory linked chunks are desynchronized from the
906        // storage. Resynchronize them manually by calling reset(), and
907        // propagate updates to observers.
908        try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
909            let updates_as_vector_diffs = state_guard.reset().await?;
910
911            let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
912                diffs: updates_as_vector_diffs,
913                origin: EventsOrigin::Cache,
914            });
915
916            let _ = room
917                .inner
918                .generic_update_sender
919                .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
920
921            Ok::<_, EventCacheError>(())
922        }))
923        .await?;
924
925        Ok(())
926    }
927
928    /// Handles a single set of room updates at once.
929    #[instrument(skip(self, updates))]
930    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
931        // First, take the lock that indicates we're processing updates, to avoid
932        // handling multiple updates concurrently.
933        let _lock = {
934            let _timer = timer!("Taking the `multiple_room_updates_lock`");
935            self.multiple_room_updates_lock.lock().await
936        };
937
938        // Note: bnjbvr tried to make this concurrent at some point, but it turned out
939        // to be a performance regression, even for large sync updates. Lacking
940        // time to investigate, this code remains sequential for now. See also
941        // https://github.com/matrix-org/matrix-rust-sdk/pull/5426.
942
943        // Left rooms.
944        for (room_id, left_room_update) in updates.left {
945            let room = self.for_room(&room_id).await?;
946
947            if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
948                // Non-fatal error, try to continue to the next room.
949                error!("handling left room update: {err}");
950            }
951        }
952
953        // Joined rooms.
954        for (room_id, joined_room_update) in updates.joined {
955            trace!(?room_id, "Handling a `JoinedRoomUpdate`");
956
957            let room = self.for_room(&room_id).await?;
958
959            if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
960                // Non-fatal error, try to continue to the next room.
961                error!(%room_id, "handling joined room update: {err}");
962            }
963        }
964
965        // Invited rooms.
966        // TODO: we don't anything with `updates.invite` at this point.
967
968        Ok(())
969    }
970
971    /// Return a room-specific view over the [`EventCache`].
972    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
973        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
974        // write lock.
975        let by_room_guard = self.by_room.read().await;
976
977        match by_room_guard.get(room_id) {
978            Some(room) => Ok(room.clone()),
979
980            None => {
981                // Slow-path: the entry doesn't exist; let's acquire a write lock.
982                drop(by_room_guard);
983                let mut by_room_guard = self.by_room.write().await;
984
985                // In the meanwhile, some other caller might have obtained write access and done
986                // the same, so check for existence again.
987                if let Some(room) = by_room_guard.get(room_id) {
988                    return Ok(room.clone());
989                }
990
991                let pagination_status =
992                    SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
993
994                let Some(client) = self.client.get() else {
995                    return Err(EventCacheError::ClientDropped);
996                };
997
998                let room = client
999                    .get_room(room_id)
1000                    .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
1001                let room_version_rules = room.clone_info().room_version_rules_or_default();
1002
1003                let enabled_thread_support = matches!(
1004                    client.base_client().threading_support,
1005                    ThreadingSupport::Enabled { .. }
1006                );
1007
1008                let room_state = RoomEventCacheState::new(
1009                    room_id.to_owned(),
1010                    room_version_rules,
1011                    enabled_thread_support,
1012                    self.linked_chunk_update_sender.clone(),
1013                    self.store.clone(),
1014                    pagination_status.clone(),
1015                )
1016                .await?;
1017
1018                let timeline_is_not_empty =
1019                    room_state.room_linked_chunk().revents().next().is_some();
1020
1021                // SAFETY: we must have subscribed before reaching this code, otherwise
1022                // something is very wrong.
1023                let auto_shrink_sender =
1024                    self.auto_shrink_sender.get().cloned().expect(
1025                        "we must have called `EventCache::subscribe()` before calling here.",
1026                    );
1027
1028                let room_event_cache = RoomEventCache::new(
1029                    self.client.clone(),
1030                    room_state,
1031                    pagination_status,
1032                    room_id.to_owned(),
1033                    auto_shrink_sender,
1034                    self.generic_update_sender.clone(),
1035                );
1036
1037                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1038
1039                // If at least one event has been loaded, it means there is a timeline. Let's
1040                // emit a generic update.
1041                if timeline_is_not_empty {
1042                    let _ = self
1043                        .generic_update_sender
1044                        .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1045                }
1046
1047                Ok(room_event_cache)
1048            }
1049        }
1050    }
1051}
1052
1053/// The result of a single back-pagination request.
1054#[derive(Debug)]
1055pub struct BackPaginationOutcome {
1056    /// Did the back-pagination reach the start of the timeline?
1057    pub reached_start: bool,
1058
1059    /// All the events that have been returned in the back-pagination
1060    /// request.
1061    ///
1062    /// Events are presented in reverse order: the first element of the vec,
1063    /// if present, is the most "recent" event from the chunk (or
1064    /// technically, the last one in the topological ordering).
1065    pub events: Vec<TimelineEvent>,
1066}
1067
1068/// Represents a timeline update of a room. It hides the details of
1069/// [`RoomEventCacheUpdate`] by being more generic.
1070///
1071/// This is used by [`EventCache::subscribe_to_room_generic_updates`]. Please
1072/// read it to learn more about the motivation behind this type.
1073#[derive(Clone, Debug)]
1074pub struct RoomEventCacheGenericUpdate {
1075    /// The room ID owning the timeline.
1076    pub room_id: OwnedRoomId,
1077}
1078
1079/// An update being triggered when events change in the persisted event cache
1080/// for any room.
1081#[derive(Clone, Debug)]
1082struct RoomEventCacheLinkedChunkUpdate {
1083    /// The linked chunk affected by the update.
1084    linked_chunk_id: OwnedLinkedChunkId,
1085
1086    /// A vector of all the linked chunk updates that happened during this event
1087    /// cache update.
1088    updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1089}
1090
1091impl RoomEventCacheLinkedChunkUpdate {
1092    /// Return all the new events propagated by this update, in topological
1093    /// order.
1094    pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1095        use itertools::Either;
1096        self.updates.into_iter().flat_map(|update| match update {
1097            linked_chunk::Update::PushItems { items, .. } => {
1098                Either::Left(Either::Left(items.into_iter()))
1099            }
1100            linked_chunk::Update::ReplaceItem { item, .. } => {
1101                Either::Left(Either::Right(std::iter::once(item)))
1102            }
1103            linked_chunk::Update::RemoveItem { .. }
1104            | linked_chunk::Update::DetachLastItems { .. }
1105            | linked_chunk::Update::StartReattachItems
1106            | linked_chunk::Update::EndReattachItems
1107            | linked_chunk::Update::NewItemsChunk { .. }
1108            | linked_chunk::Update::NewGapChunk { .. }
1109            | linked_chunk::Update::RemoveChunk(..)
1110            | linked_chunk::Update::Clear => {
1111                // All these updates don't contain any new event.
1112                Either::Right(std::iter::empty())
1113            }
1114        })
1115    }
1116}
1117
1118/// An update related to events happened in a room.
1119#[derive(Debug, Clone)]
1120pub enum RoomEventCacheUpdate {
1121    /// The fully read marker has moved to a different event.
1122    MoveReadMarkerTo {
1123        /// Event at which the read marker is now pointing.
1124        event_id: OwnedEventId,
1125    },
1126
1127    /// The members have changed.
1128    UpdateMembers {
1129        /// Collection of ambiguity changes that room member events trigger.
1130        ///
1131        /// This is a map of event ID of the `m.room.member` event to the
1132        /// details of the ambiguity change.
1133        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1134    },
1135
1136    /// The room has received updates for the timeline as _diffs_.
1137    UpdateTimelineEvents {
1138        /// Diffs to apply to the timeline.
1139        diffs: Vec<VectorDiff<TimelineEvent>>,
1140
1141        /// Where the diffs are coming from.
1142        origin: EventsOrigin,
1143    },
1144
1145    /// The room has received new ephemeral events.
1146    AddEphemeralEvents {
1147        /// XXX: this is temporary, until read receipts are handled in the event
1148        /// cache
1149        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1150    },
1151}
1152
1153/// Indicate where events are coming from.
1154#[derive(Debug, Clone)]
1155pub enum EventsOrigin {
1156    /// Events are coming from a sync.
1157    Sync,
1158
1159    /// Events are coming from pagination.
1160    Pagination,
1161
1162    /// The cause of the change is purely internal to the cache.
1163    Cache,
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use std::{ops::Not, sync::Arc, time::Duration};
1169
1170    use assert_matches::assert_matches;
1171    use futures_util::FutureExt as _;
1172    use matrix_sdk_base::{
1173        RoomState,
1174        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1175        sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1176    };
1177    use matrix_sdk_test::{
1178        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
1179    };
1180    use ruma::{event_id, room_id, serde::Raw, user_id};
1181    use serde_json::json;
1182    use tokio::time::sleep;
1183
1184    use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1185    use crate::test_utils::{
1186        assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1187    };
1188
1189    #[async_test]
1190    async fn test_must_explicitly_subscribe() {
1191        let client = logged_in_client(None).await;
1192
1193        let event_cache = client.event_cache();
1194
1195        // If I create a room event subscriber for a room before subscribing the event
1196        // cache,
1197        let room_id = room_id!("!omelette:fromage.fr");
1198        let result = event_cache.for_room(room_id).await;
1199
1200        // Then it fails, because one must explicitly call `.subscribe()` on the event
1201        // cache.
1202        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1203    }
1204
1205    #[async_test]
1206    async fn test_uniq_read_marker() {
1207        let client = logged_in_client(None).await;
1208        let room_id = room_id!("!galette:saucisse.bzh");
1209        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1210
1211        let event_cache = client.event_cache();
1212
1213        event_cache.subscribe().unwrap();
1214
1215        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1216        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1217        let (events, mut stream) = room_event_cache.subscribe().await;
1218
1219        assert!(events.is_empty());
1220
1221        // When sending multiple times the same read marker event,…
1222        let read_marker_event = Raw::from_json_string(
1223            json!({
1224                "content": {
1225                    "event_id": "$crepe:saucisse.bzh"
1226                },
1227                "room_id": "!galette:saucisse.bzh",
1228                "type": "m.fully_read"
1229            })
1230            .to_string(),
1231        )
1232        .unwrap();
1233        let account_data = vec![read_marker_event; 100];
1234
1235        room_event_cache
1236            .inner
1237            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1238            .await
1239            .unwrap();
1240
1241        // … there's only one read marker update.
1242        assert_matches!(
1243            stream.recv().await.unwrap(),
1244            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1245        );
1246
1247        assert!(stream.recv().now_or_never().is_none());
1248
1249        // None, because an account data doesn't trigger a generic update.
1250        assert!(generic_stream.recv().now_or_never().is_none());
1251    }
1252
1253    #[async_test]
1254    async fn test_get_event_by_id() {
1255        let client = logged_in_client(None).await;
1256        let room_id1 = room_id!("!galette:saucisse.bzh");
1257        let room_id2 = room_id!("!crepe:saucisse.bzh");
1258
1259        client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1260        client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1261
1262        let event_cache = client.event_cache();
1263        event_cache.subscribe().unwrap();
1264
1265        // Insert two rooms with a few events.
1266        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1267
1268        let eid1 = event_id!("$1");
1269        let eid2 = event_id!("$2");
1270        let eid3 = event_id!("$3");
1271
1272        let joined_room_update1 = JoinedRoomUpdate {
1273            timeline: Timeline {
1274                events: vec![
1275                    f.text_msg("hey").event_id(eid1).into(),
1276                    f.text_msg("you").event_id(eid2).into(),
1277                ],
1278                ..Default::default()
1279            },
1280            ..Default::default()
1281        };
1282
1283        let joined_room_update2 = JoinedRoomUpdate {
1284            timeline: Timeline {
1285                events: vec![f.text_msg("bjr").event_id(eid3).into()],
1286                ..Default::default()
1287            },
1288            ..Default::default()
1289        };
1290
1291        let mut updates = RoomUpdates::default();
1292        updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1293        updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1294
1295        // Have the event cache handle them.
1296        event_cache.inner.handle_room_updates(updates).await.unwrap();
1297
1298        // We can find the events in a single room.
1299        let room1 = client.get_room(room_id1).unwrap();
1300
1301        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1302
1303        let found1 = room_event_cache.find_event(eid1).await.unwrap();
1304        assert_event_matches_msg(&found1, "hey");
1305
1306        let found2 = room_event_cache.find_event(eid2).await.unwrap();
1307        assert_event_matches_msg(&found2, "you");
1308
1309        // Retrieving the event with id3 from the room which doesn't contain it will
1310        // fail…
1311        assert!(room_event_cache.find_event(eid3).await.is_none());
1312    }
1313
1314    #[async_test]
1315    async fn test_save_event() {
1316        let client = logged_in_client(None).await;
1317        let room_id = room_id!("!galette:saucisse.bzh");
1318
1319        let event_cache = client.event_cache();
1320        event_cache.subscribe().unwrap();
1321
1322        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1323        let event_id = event_id!("$1");
1324
1325        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1326        let room = client.get_room(room_id).unwrap();
1327
1328        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1329        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1330
1331        // Retrieving the event at the room-wide cache works.
1332        assert!(room_event_cache.find_event(event_id).await.is_some());
1333    }
1334
1335    #[async_test]
1336    async fn test_generic_update_when_loading_rooms() {
1337        // Create 2 rooms. One of them has data in the event cache storage.
1338        let user = user_id!("@mnt_io:matrix.org");
1339        let client = logged_in_client(None).await;
1340        let room_id_0 = room_id!("!raclette:patate.ch");
1341        let room_id_1 = room_id!("!fondue:patate.ch");
1342
1343        let event_factory = EventFactory::new().room(room_id_0).sender(user);
1344
1345        let event_cache = client.event_cache();
1346        event_cache.subscribe().unwrap();
1347
1348        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1349        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1350
1351        client
1352            .event_cache_store()
1353            .lock()
1354            .await
1355            .unwrap()
1356            .handle_linked_chunk_updates(
1357                LinkedChunkId::Room(room_id_0),
1358                vec![
1359                    // Non-empty items chunk.
1360                    Update::NewItemsChunk {
1361                        previous: None,
1362                        new: ChunkIdentifier::new(0),
1363                        next: None,
1364                    },
1365                    Update::PushItems {
1366                        at: Position::new(ChunkIdentifier::new(0), 0),
1367                        items: vec![
1368                            event_factory
1369                                .text_msg("hello")
1370                                .sender(user)
1371                                .event_id(event_id!("$ev0"))
1372                                .into_event(),
1373                        ],
1374                    },
1375                ],
1376            )
1377            .await
1378            .unwrap();
1379
1380        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1381
1382        // Room 0 has initial data, so it must trigger a generic update.
1383        {
1384            let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1385
1386            assert_matches!(
1387                generic_stream.recv().await,
1388                Ok(RoomEventCacheGenericUpdate { room_id }) => {
1389                    assert_eq!(room_id, room_id_0);
1390                }
1391            );
1392        }
1393
1394        // Room 1 has NO initial data, so nothing should happen.
1395        {
1396            let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1397
1398            assert!(generic_stream.recv().now_or_never().is_none());
1399        }
1400    }
1401
1402    #[async_test]
1403    async fn test_generic_update_when_paginating_room() {
1404        // Create 1 room, with 4 chunks in the event cache storage.
1405        let user = user_id!("@mnt_io:matrix.org");
1406        let client = logged_in_client(None).await;
1407        let room_id = room_id!("!raclette:patate.ch");
1408
1409        let event_factory = EventFactory::new().room(room_id).sender(user);
1410
1411        let event_cache = client.event_cache();
1412        event_cache.subscribe().unwrap();
1413
1414        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1415
1416        client
1417            .event_cache_store()
1418            .lock()
1419            .await
1420            .unwrap()
1421            .handle_linked_chunk_updates(
1422                LinkedChunkId::Room(room_id),
1423                vec![
1424                    // Empty chunk.
1425                    Update::NewItemsChunk {
1426                        previous: None,
1427                        new: ChunkIdentifier::new(0),
1428                        next: None,
1429                    },
1430                    // Empty chunk.
1431                    Update::NewItemsChunk {
1432                        previous: Some(ChunkIdentifier::new(0)),
1433                        new: ChunkIdentifier::new(1),
1434                        next: None,
1435                    },
1436                    // Non-empty items chunk.
1437                    Update::NewItemsChunk {
1438                        previous: Some(ChunkIdentifier::new(1)),
1439                        new: ChunkIdentifier::new(2),
1440                        next: None,
1441                    },
1442                    Update::PushItems {
1443                        at: Position::new(ChunkIdentifier::new(2), 0),
1444                        items: vec![
1445                            event_factory
1446                                .text_msg("hello")
1447                                .sender(user)
1448                                .event_id(event_id!("$ev0"))
1449                                .into_event(),
1450                        ],
1451                    },
1452                    // Non-empty items chunk.
1453                    Update::NewItemsChunk {
1454                        previous: Some(ChunkIdentifier::new(2)),
1455                        new: ChunkIdentifier::new(3),
1456                        next: None,
1457                    },
1458                    Update::PushItems {
1459                        at: Position::new(ChunkIdentifier::new(3), 0),
1460                        items: vec![
1461                            event_factory
1462                                .text_msg("world")
1463                                .sender(user)
1464                                .event_id(event_id!("$ev1"))
1465                                .into_event(),
1466                        ],
1467                    },
1468                ],
1469            )
1470            .await
1471            .unwrap();
1472
1473        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1474
1475        // Room is initialised, it gets one event in the timeline.
1476        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1477
1478        assert_matches!(
1479            generic_stream.recv().await,
1480            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1481                assert_eq!(room_id, expected_room_id);
1482            }
1483        );
1484
1485        let pagination = room_event_cache.pagination();
1486
1487        // Paginate, it gets one new event in the timeline.
1488        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1489
1490        assert_eq!(pagination_outcome.events.len(), 1);
1491        assert!(pagination_outcome.reached_start.not());
1492        assert_matches!(
1493            generic_stream.recv().await,
1494            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1495                assert_eq!(room_id, expected_room_id);
1496            }
1497        );
1498
1499        // Paginate, it gets zero new event in the timeline.
1500        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1501
1502        assert!(pagination_outcome.events.is_empty());
1503        assert!(pagination_outcome.reached_start.not());
1504        assert!(generic_stream.recv().now_or_never().is_none());
1505
1506        // Paginate once more. Just checking our scenario is correct.
1507        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1508
1509        assert!(pagination_outcome.reached_start);
1510        assert!(generic_stream.recv().now_or_never().is_none());
1511    }
1512
1513    #[async_test]
1514    async fn test_for_room_when_room_is_not_found() {
1515        let client = logged_in_client(None).await;
1516        let room_id = room_id!("!raclette:patate.ch");
1517
1518        let event_cache = client.event_cache();
1519        event_cache.subscribe().unwrap();
1520
1521        // Room doesn't exist. It returns an error.
1522        assert_matches!(
1523            event_cache.for_room(room_id).await,
1524            Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1525                assert_eq!(room_id, not_found_room_id);
1526            }
1527        );
1528
1529        // Now create the room.
1530        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1531
1532        // Room exists. Everything fine.
1533        assert!(event_cache.for_room(room_id).await.is_ok());
1534    }
1535
1536    /// Test that the event cache does not create reference cycles or tasks that
1537    /// retain its reference indefinitely, preventing it from being deallocated.
1538    #[cfg(not(target_family = "wasm"))]
1539    #[async_test]
1540    async fn test_no_refcycle_event_cache_tasks() {
1541        let client = MockClientBuilder::new(None).build().await;
1542
1543        // Wait for the init tasks to die.
1544        sleep(Duration::from_secs(1)).await;
1545
1546        let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1547        assert_eq!(event_cache_weak.strong_count(), 1);
1548
1549        {
1550            let room_id = room_id!("!room:example.org");
1551
1552            // Have the client know the room.
1553            let response = SyncResponseBuilder::default()
1554                .add_joined_room(JoinedRoomBuilder::new(room_id))
1555                .build_sync_response();
1556            client.inner.base_client.receive_sync_response(response).await.unwrap();
1557
1558            client.event_cache().subscribe().unwrap();
1559
1560            let (_room_event_cache, _drop_handles) =
1561                client.get_room(room_id).unwrap().event_cache().await.unwrap();
1562        }
1563
1564        drop(client);
1565
1566        // Give a bit of time for background tasks to die.
1567        sleep(Duration::from_secs(1)).await;
1568
1569        // No strong counts should exist now that the Client has been dropped.
1570        assert_eq!(
1571            event_cache_weak.strong_count(),
1572            0,
1573            "Too many strong references to the event cache {}",
1574            event_cache_weak.strong_count()
1575        );
1576    }
1577}