matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::{BTreeMap, HashMap},
32    fmt,
33    sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40    deserialized_responses::{AmbiguityChange, TimelineEvent},
41    event_cache::{
42        store::{EventCacheStoreError, EventCacheStoreLock},
43        Gap,
44    },
45    executor::AbortOnDrop,
46    linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
47    serde_helpers::extract_thread_root_from_content,
48    store_locks::LockStoreError,
49    sync::RoomUpdates,
50    timer, ThreadingSupport,
51};
52use matrix_sdk_common::executor::{spawn, JoinHandle};
53use room::RoomEventCacheState;
54#[cfg(feature = "experimental-search")]
55use ruma::events::AnySyncMessageLikeEvent;
56use ruma::{
57    events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, OwnedTransactionId,
58    RoomId,
59};
60use tokio::{
61    select,
62    sync::{
63        broadcast::{channel, error::RecvError, Receiver, Sender},
64        mpsc, Mutex, RwLock,
65    },
66};
67use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
68
69use crate::{
70    client::WeakClient,
71    send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
72    Client,
73};
74
75mod deduplicator;
76mod pagination;
77mod room;
78
79pub use pagination::{RoomPagination, RoomPaginationStatus};
80pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
81
82/// An error observed in the [`EventCache`].
83#[derive(thiserror::Error, Debug)]
84pub enum EventCacheError {
85    /// The [`EventCache`] instance hasn't been initialized with
86    /// [`EventCache::subscribe`]
87    #[error(
88        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
89    )]
90    NotSubscribedYet,
91
92    /// Room is not found.
93    #[error("Room `{room_id}` is not found.")]
94    RoomNotFound {
95        /// The ID of the room not being found.
96        room_id: OwnedRoomId,
97    },
98
99    /// An error has been observed while back-paginating.
100    #[error(transparent)]
101    BackpaginationError(Box<crate::Error>),
102
103    /// Back-pagination was already happening in a given room, where we tried to
104    /// back-paginate again.
105    #[error("We were already back-paginating.")]
106    AlreadyBackpaginating,
107
108    /// An error happening when interacting with storage.
109    #[error(transparent)]
110    Storage(#[from] EventCacheStoreError),
111
112    /// An error happening when attempting to (cross-process) lock storage.
113    #[error(transparent)]
114    LockingStorage(#[from] LockStoreError),
115
116    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
117    /// to. It's possible this weak reference points to nothing anymore, at
118    /// times where we try to use the client.
119    #[error("The owning client of the event cache has been dropped.")]
120    ClientDropped,
121
122    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
123    /// loader.
124    ///
125    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
126    #[error(transparent)]
127    LinkedChunkLoader(#[from] LazyLoaderError),
128
129    /// An error happened when reading the metadata of a linked chunk, upon
130    /// reload.
131    #[error("the linked chunk metadata is invalid: {details}")]
132    InvalidLinkedChunkMetadata {
133        /// A string containing details about the error.
134        details: String,
135    },
136}
137
138/// A result using the [`EventCacheError`].
139pub type Result<T> = std::result::Result<T, EventCacheError>;
140
141/// Hold handles to the tasks spawn by a [`EventCache`].
142pub struct EventCacheDropHandles {
143    /// Task that listens to room updates.
144    listen_updates_task: JoinHandle<()>,
145
146    /// Task that listens to updates to the user's ignored list.
147    ignore_user_list_update_task: JoinHandle<()>,
148
149    /// The task used to automatically shrink the linked chunks.
150    auto_shrink_linked_chunk_task: JoinHandle<()>,
151}
152
153impl fmt::Debug for EventCacheDropHandles {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
156    }
157}
158
159impl Drop for EventCacheDropHandles {
160    fn drop(&mut self) {
161        self.listen_updates_task.abort();
162        self.ignore_user_list_update_task.abort();
163        self.auto_shrink_linked_chunk_task.abort();
164    }
165}
166
167/// An event cache, providing lots of useful functionality for clients.
168///
169/// Cloning is shallow, and thus is cheap to do.
170///
171/// See also the module-level comment.
172#[derive(Clone)]
173pub struct EventCache {
174    /// Reference to the inner cache.
175    inner: Arc<EventCacheInner>,
176}
177
178impl fmt::Debug for EventCache {
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        f.debug_struct("EventCache").finish_non_exhaustive()
181    }
182}
183
184impl EventCache {
185    /// Create a new [`EventCache`] for the given client.
186    pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
187        let (generic_update_sender, _) = channel(32);
188        let (linked_chunk_update_sender, _) = channel(32);
189
190        let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
191        let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
192            client.clone(),
193            linked_chunk_update_sender.clone(),
194            thread_subscriber_sender,
195        )));
196
197        #[cfg(feature = "experimental-search")]
198        let search_indexing_task = AbortOnDrop::new(spawn(Self::search_indexing_task(
199            client.clone(),
200            linked_chunk_update_sender.clone(),
201        )));
202
203        Self {
204            inner: Arc::new(EventCacheInner {
205                client,
206                store: event_cache_store,
207                multiple_room_updates_lock: Default::default(),
208                by_room: Default::default(),
209                drop_handles: Default::default(),
210                auto_shrink_sender: Default::default(),
211                generic_update_sender,
212                linked_chunk_update_sender,
213                _thread_subscriber_task: thread_subscriber_task,
214                #[cfg(feature = "experimental-search")]
215                _search_indexing_task: search_indexing_task,
216                thread_subscriber_receiver,
217            }),
218        }
219    }
220
221    /// Subscribes to updates that a thread subscription has been sent.
222    ///
223    /// For testing purposes only.
224    #[doc(hidden)]
225    pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
226        self.inner.thread_subscriber_receiver.resubscribe()
227    }
228
229    /// Starts subscribing the [`EventCache`] to sync responses, if not done
230    /// before.
231    ///
232    /// Re-running this has no effect if we already subscribed before, and is
233    /// cheap.
234    pub fn subscribe(&self) -> Result<()> {
235        let client = self.inner.client()?;
236
237        // Initialize the drop handles.
238        let _ = self.inner.drop_handles.get_or_init(|| {
239            // Spawn the task that will listen to all the room updates at once.
240            let listen_updates_task = spawn(Self::listen_task(
241                self.inner.clone(),
242                client.subscribe_to_all_room_updates(),
243            ));
244
245            let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
246                self.inner.clone(),
247                client.subscribe_to_ignore_user_list_changes(),
248            ));
249
250            let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
251
252            // Force-initialize the sender in the [`RoomEventCacheInner`].
253            self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
254
255            let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task(
256                Arc::downgrade(&self.inner),
257                auto_shrink_receiver,
258            ));
259
260            Arc::new(EventCacheDropHandles {
261                listen_updates_task,
262                ignore_user_list_update_task,
263                auto_shrink_linked_chunk_task,
264            })
265        });
266
267        Ok(())
268    }
269
270    #[instrument(skip_all)]
271    async fn ignore_user_list_update_task(
272        inner: Arc<EventCacheInner>,
273        mut ignore_user_list_stream: Subscriber<Vec<String>>,
274    ) {
275        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
276        span.follows_from(Span::current());
277
278        async move {
279            while ignore_user_list_stream.next().await.is_some() {
280                info!("Received an ignore user list change");
281                if let Err(err) = inner.clear_all_rooms().await {
282                    error!("when clearing room storage after ignore user list change: {err}");
283                }
284            }
285            info!("Ignore user list stream has closed");
286        }
287        .instrument(span)
288        .await;
289    }
290
291    /// For benchmarking purposes only.
292    #[doc(hidden)]
293    pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
294        self.inner.handle_room_updates(updates).await
295    }
296
297    #[instrument(skip_all)]
298    async fn listen_task(
299        inner: Arc<EventCacheInner>,
300        mut room_updates_feed: Receiver<RoomUpdates>,
301    ) {
302        trace!("Spawning the listen task");
303        loop {
304            match room_updates_feed.recv().await {
305                Ok(updates) => {
306                    trace!("Receiving `RoomUpdates`");
307
308                    if let Err(err) = inner.handle_room_updates(updates).await {
309                        match err {
310                            EventCacheError::ClientDropped => {
311                                // The client has dropped, exit the listen task.
312                                info!(
313                                    "Closing the event cache global listen task because client dropped"
314                                );
315                                break;
316                            }
317                            err => {
318                                error!("Error when handling room updates: {err}");
319                            }
320                        }
321                    }
322                }
323
324                Err(RecvError::Lagged(num_skipped)) => {
325                    // Forget everything we know; we could have missed events, and we have
326                    // no way to reconcile at the moment!
327                    // TODO: implement Smart Matching™,
328                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
329                    if let Err(err) = inner.clear_all_rooms().await {
330                        error!("when clearing storage after lag in listen_task: {err}");
331                    }
332                }
333
334                Err(RecvError::Closed) => {
335                    // The sender has shut down, exit.
336                    info!("Closing the event cache global listen task because receiver closed");
337                    break;
338                }
339            }
340        }
341    }
342
343    /// Spawns the task that will listen to auto-shrink notifications.
344    ///
345    /// The auto-shrink mechanism works this way:
346    ///
347    /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
348    ///   increment the active number of subscribers to that room, aka
349    ///   [`RoomEventCacheState::subscriber_count`].
350    /// - When that subscriber is dropped, it will decrement that count; and
351    ///   notify the task below if it reached 0.
352    /// - The task spawned here, owned by the [`EventCacheInner`], will listen
353    ///   to such notifications that a room may be shrunk. It will attempt an
354    ///   auto-shrink, by letting the inner state decide whether this is a good
355    ///   time to do so (new subscribers might have spawned in the meanwhile).
356    #[instrument(skip_all)]
357    async fn auto_shrink_linked_chunk_task(
358        inner: Weak<EventCacheInner>,
359        mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
360    ) {
361        while let Some(room_id) = rx.recv().await {
362            trace!(for_room = %room_id, "received notification to shrink");
363
364            let Some(inner) = inner.upgrade() else {
365                return;
366            };
367
368            let room = match inner.for_room(&room_id).await {
369                Ok(room) => room,
370                Err(err) => {
371                    warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
372                    continue;
373                }
374            };
375
376            trace!("waiting for state lock…");
377            let mut state = room.inner.state.write().await;
378
379            match state.auto_shrink_if_no_subscribers().await {
380                Ok(diffs) => {
381                    if let Some(diffs) = diffs {
382                        // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
383                        // subscribers, right? RIGHT? Especially because the state is guarded behind
384                        // a lock.
385                        //
386                        // However, better safe than sorry, and it's cheap to send an update here,
387                        // so let's do it!
388                        if !diffs.is_empty() {
389                            let _ = room.inner.sender.send(
390                                RoomEventCacheUpdate::UpdateTimelineEvents {
391                                    diffs,
392                                    origin: EventsOrigin::Cache,
393                                },
394                            );
395                        }
396                    } else {
397                        debug!("auto-shrinking didn't happen");
398                    }
399                }
400
401                Err(err) => {
402                    // There's not much we can do here, unfortunately.
403                    warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
404                }
405            }
406        }
407
408        info!("Auto-shrink linked chunk task has been closed, exiting");
409    }
410
411    /// Return a room-specific view over the [`EventCache`].
412    pub(crate) async fn for_room(
413        &self,
414        room_id: &RoomId,
415    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
416        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
417            return Err(EventCacheError::NotSubscribedYet);
418        };
419
420        let room = self.inner.for_room(room_id).await?;
421
422        Ok((room, drop_handles))
423    }
424
425    /// Cleanly clear all the rooms' event caches.
426    ///
427    /// This will notify any live observers that the room has been cleared.
428    pub async fn clear_all_rooms(&self) -> Result<()> {
429        self.inner.clear_all_rooms().await
430    }
431
432    /// Subscribe to room _generic_ updates.
433    ///
434    /// If one wants to listen what has changed in a specific room, the
435    /// [`RoomEventCache::subscribe`] is recommended. However, the
436    /// [`RoomEventCacheSubscriber`] type triggers side-effects.
437    ///
438    /// If one wants to get a high-overview, generic, updates for rooms, and
439    /// without side-effects, this method is recommended. Also, dropping the
440    /// receiver of this channel will not trigger any side-effect.
441    pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
442        self.inner.generic_update_sender.subscribe()
443    }
444
445    /// React to a given linked chunk update by subscribing the user to a
446    /// thread, if needs be (when the user got mentioned in a thread reply, for
447    /// a thread they were not subscribed to).
448    ///
449    /// Returns a boolean indicating whether the task should keep on running or
450    /// not.
451    #[instrument(skip(client, thread_subscriber_sender))]
452    async fn handle_thread_subscriber_linked_chunk_update(
453        client: &WeakClient,
454        thread_subscriber_sender: &Sender<()>,
455        up: RoomEventCacheLinkedChunkUpdate,
456    ) -> bool {
457        let Some(client) = client.get() else {
458            // Client shutting down.
459            debug!("Client is shutting down, exiting thread subscriber task");
460            return false;
461        };
462
463        let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
464            trace!("received an update for a non-thread linked chunk, ignoring");
465            return true;
466        };
467
468        let Some(room) = client.get_room(room_id) else {
469            warn!(%room_id, "unknown room");
470            return true;
471        };
472
473        let thread_root = thread_root.clone();
474
475        let mut new_events = up.events().peekable();
476
477        if new_events.peek().is_none() {
478            // No new events, nothing to do.
479            return true;
480        }
481
482        // This `PushContext` is going to be used to compute whether an in-thread event
483        // would trigger a mention.
484        //
485        // Of course, we're not interested in an in-thread event causing a mention,
486        // because it's part of a thread we've subscribed to. So the
487        // `PushContext` must not include the check for thread subscriptions (otherwise
488        // it would be impossible to subscribe to new threads).
489
490        let with_thread_subscriptions = false;
491
492        let Some(push_context) = room
493            .push_context_internal(with_thread_subscriptions)
494            .await
495            .inspect_err(|err| {
496                warn!("Failed to get push context for threads: {err}");
497            })
498            .ok()
499            .flatten()
500        else {
501            warn!("Missing push context for thread subscriptions.");
502            return true;
503        };
504
505        let mut subscribe_up_to = None;
506
507        // Find if there's an event that would trigger a mention for the current
508        // user, iterating from the end of the new events towards the oldest, so we can
509        // find the most recent event to subscribe to.
510        for ev in new_events.rev() {
511            if push_context
512                .for_event(ev.raw())
513                .await
514                .into_iter()
515                .any(|action| action.should_notify())
516            {
517                let Some(event_id) = ev.event_id() else {
518                    // Shouldn't happen.
519                    continue;
520                };
521                subscribe_up_to = Some(event_id);
522                break;
523            }
524        }
525
526        // And if we've found such a mention, subscribe to the thread up to this
527        // event.
528        if let Some(event_id) = subscribe_up_to {
529            trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
530            if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
531                warn!(%err, "Failed to subscribe to thread");
532            } else {
533                let _ = thread_subscriber_sender.send(());
534            }
535        }
536
537        true
538    }
539
540    /// React to a given send queue update by subscribing the user to a
541    /// thread, if needs be (when the user sent an event in a thread they were
542    /// not subscribed to).
543    ///
544    /// Returns a boolean indicating whether the task should keep on running or
545    /// not.
546    #[instrument(skip(client, thread_subscriber_sender))]
547    async fn handle_thread_subscriber_send_queue_update(
548        client: &WeakClient,
549        thread_subscriber_sender: &Sender<()>,
550        events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
551        up: SendQueueUpdate,
552    ) -> bool {
553        let Some(client) = client.get() else {
554            // Client shutting down.
555            debug!("Client is shutting down, exiting thread subscriber task");
556            return false;
557        };
558
559        let room_id = up.room_id;
560        let Some(room) = client.get_room(&room_id) else {
561            warn!(%room_id, "unknown room");
562            return true;
563        };
564
565        let (thread_root, subscribe_up_to) = match up.update {
566            RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
567                match local_echo.content {
568                    LocalEchoContent::Event { serialized_event, .. } => {
569                        if let Some(thread_root) =
570                            extract_thread_root_from_content(serialized_event.into_raw().0)
571                        {
572                            events_being_sent.insert(local_echo.transaction_id, thread_root);
573                        }
574                    }
575                    LocalEchoContent::React { .. } => {
576                        // Nothing to do, reactions don't count as a thread
577                        // subscription.
578                    }
579                }
580                return true;
581            }
582
583            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
584                events_being_sent.remove(&transaction_id);
585                return true;
586            }
587
588            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
589                if let Some(thread_root) =
590                    extract_thread_root_from_content(new_content.into_raw().0)
591                {
592                    events_being_sent.insert(transaction_id, thread_root);
593                } else {
594                    // It could be that the event isn't part of a thread anymore; handle that by
595                    // removing the pending transaction id.
596                    events_being_sent.remove(&transaction_id);
597                }
598                return true;
599            }
600
601            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
602                if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
603                    (thread_root, event_id)
604                } else {
605                    // We don't know about the event that has been sent, so ignore it.
606                    trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
607                    return true;
608                }
609            }
610
611            RoomSendQueueUpdate::SendError { .. }
612            | RoomSendQueueUpdate::RetryEvent { .. }
613            | RoomSendQueueUpdate::MediaUpload { .. } => {
614                // Nothing to do for these bad boys.
615                return true;
616            }
617        };
618
619        // And if we've found such a mention, subscribe to the thread up to this event.
620        trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
621        if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
622        {
623            warn!(%err, "Failed to subscribe to thread");
624        } else {
625            let _ = thread_subscriber_sender.send(());
626        }
627
628        true
629    }
630
631    #[instrument(skip_all)]
632    async fn thread_subscriber_task(
633        client: WeakClient,
634        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
635        thread_subscriber_sender: Sender<()>,
636    ) {
637        let mut send_q_rx = if let Some(client) = client.get() {
638            if !client.enabled_thread_subscriptions() {
639                trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
640                return;
641            }
642
643            client.send_queue().subscribe()
644        } else {
645            trace!("Client is shutting down, not spawning thread subscriber task");
646            return;
647        };
648
649        let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
650
651        // A mapping of local echoes (events being sent), to their thread root, if
652        // they're in an in-thread reply.
653        //
654        // Entirely managed by `handle_thread_subscriber_send_queue_update`.
655        let mut events_being_sent = HashMap::new();
656
657        loop {
658            select! {
659                res = send_q_rx.recv() => {
660                    match res {
661                        Ok(up) => {
662                            if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
663                                break;
664                            }
665                        }
666                        Err(RecvError::Closed) => {
667                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
668                            break;
669                        }
670                        Err(RecvError::Lagged(num_skipped)) => {
671                            warn!(num_skipped, "Lagged behind linked chunk updates");
672                        }
673                    }
674                }
675
676                res = linked_chunk_rx.recv() => {
677                    match res {
678                        Ok(up) => {
679                            if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
680                                break;
681                            }
682                        }
683                        Err(RecvError::Closed) => {
684                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
685                            break;
686                        }
687                        Err(RecvError::Lagged(num_skipped)) => {
688                            warn!(num_skipped, "Lagged behind linked chunk updates");
689                        }
690                    }
691                }
692            }
693        }
694    }
695
696    /// Takes a [`TimelineEvent`] and passes it to the [`RoomIndex`] of the
697    /// given room which will add/remove/edit an event in the index based on
698    /// the event type.
699    #[cfg(feature = "experimental-search")]
700    #[instrument(skip_all)]
701    async fn search_indexing_task(
702        client: WeakClient,
703        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
704    ) {
705        let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
706
707        loop {
708            match linked_chunk_update_receiver.recv().await {
709                Ok(room_ec_lc_update) => {
710                    let OwnedLinkedChunkId::Room(room_id) =
711                        room_ec_lc_update.linked_chunk_id.clone()
712                    else {
713                        trace!("Received non-room updates, ignoring.");
714                        continue;
715                    };
716
717                    let mut timeline_events = room_ec_lc_update.events().peekable();
718
719                    if timeline_events.peek().is_none() {
720                        continue;
721                    }
722
723                    let Some(client) = client.get() else {
724                        trace!("Client is shutting down, not spawning thread subscriber task");
725                        return;
726                    };
727
728                    let mut search_index_guard = client.search_index().lock().await;
729
730                    for event in timeline_events {
731                        if let Some(message_event) = parse_timeline_event_for_search_index(&event) {
732                            if let Err(err) =
733                                search_index_guard.handle_event(message_event, &room_id)
734                            {
735                                warn!("Failed to handle event for indexing: {err}")
736                            }
737                        }
738                    }
739                }
740                Err(RecvError::Closed) => {
741                    debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
742                    break;
743                }
744                Err(RecvError::Lagged(num_skipped)) => {
745                    warn!(num_skipped, "Lagged behind linked chunk updates");
746                }
747            }
748        }
749    }
750}
751
752#[cfg(feature = "experimental-search")]
753fn parse_timeline_event_for_search_index(event: &TimelineEvent) -> Option<AnySyncMessageLikeEvent> {
754    use ruma::events::AnySyncTimelineEvent;
755
756    if event.kind.is_utd() {
757        return None;
758    }
759
760    match event.raw().deserialize() {
761        Ok(event) => match event {
762            AnySyncTimelineEvent::MessageLike(event) => Some(event),
763            AnySyncTimelineEvent::State(_) => None,
764        },
765
766        Err(e) => {
767            warn!("failed to parse event: {e:?}");
768            None
769        }
770    }
771}
772
773struct EventCacheInner {
774    /// A weak reference to the inner client, useful when trying to get a handle
775    /// on the owning client.
776    client: WeakClient,
777
778    /// Reference to the underlying store.
779    store: EventCacheStoreLock,
780
781    /// A lock used when many rooms must be updated at once.
782    ///
783    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
784    /// ensure that multiple updates will be applied in the correct order, which
785    /// is enforced by taking this lock when handling an update.
786    // TODO: that's the place to add a cross-process lock!
787    multiple_room_updates_lock: Mutex<()>,
788
789    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
790    by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
791
792    /// Handles to keep alive the task listening to updates.
793    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
794
795    /// A sender for notifications that a room *may* need to be auto-shrunk.
796    ///
797    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
798    /// instance.
799    ///
800    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
801    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
802
803    /// A sender for room generic update.
804    ///
805    /// See doc comment of [`RoomEventCacheGenericUpdate`] and
806    /// [`EventCache::subscribe_to_room_generic_updates`].
807    generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
808
809    /// A sender for a persisted linked chunk update.
810    ///
811    /// This is used to notify that some linked chunk has persisted some updates
812    /// to a store, during sync or a back-pagination of *any* linked chunk.
813    /// This can be used by observers to look for new events.
814    ///
815    /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
816    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
817
818    /// A background task listening to room and send queue updates, and
819    /// automatically subscribing the user to threads when needed, based on
820    /// the semantics of MSC4306.
821    ///
822    /// One important constraint is that there is only one such task per
823    /// [`EventCache`], so it does listen to *all* rooms at the same time.
824    _thread_subscriber_task: AbortOnDrop<()>,
825
826    /// A background task listening to room updates, and
827    /// automatically handling search index operations add/remove/edit
828    /// depending on the event type.
829    ///
830    /// One important constraint is that there is only one such task per
831    /// [`EventCache`], so it does listen to *all* rooms at the same time.
832    #[cfg(feature = "experimental-search")]
833    _search_indexing_task: AbortOnDrop<()>,
834
835    /// A test helper receiver that will be emitted every time the thread
836    /// subscriber task subscribed to a new thread.
837    ///
838    /// This is helpful for tests to coordinate that a new thread subscription
839    /// has been sent or not.
840    thread_subscriber_receiver: Receiver<()>,
841}
842
843type AutoShrinkChannelPayload = OwnedRoomId;
844
845impl EventCacheInner {
846    fn client(&self) -> Result<Client> {
847        self.client.get().ok_or(EventCacheError::ClientDropped)
848    }
849
850    /// Clears all the room's data.
851    async fn clear_all_rooms(&self) -> Result<()> {
852        // Okay, here's where things get complicated.
853        //
854        // On the one hand, `by_room` may include storage for *some* rooms that we know
855        // about, but not *all* of them. Any room that hasn't been loaded in the
856        // client, or touched by a sync, will remain unloaded in memory, so it
857        // will be missing from `self.by_room`. As a result, we need to make
858        // sure that we're hitting the storage backend to *really* clear all the
859        // rooms, including those that haven't been loaded yet.
860        //
861        // On the other hand, one must NOT clear the `by_room` map, because if someone
862        // subscribed to a room update, they would never get any new update for
863        // that room, since re-creating the `RoomEventCache` would create a new,
864        // unrelated sender.
865        //
866        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
867        // store backend.
868        //
869        // As a result, for a short while, the in-memory linked chunks
870        // will be desynchronized from the storage. We need to be careful then. During
871        // that short while, we don't want *anyone* to touch the linked chunk
872        // (be it in memory or in the storage).
873        //
874        // And since that requirement applies to *any* room in `by_room` at the same
875        // time, we'll have to take the locks for *all* the live rooms, so as to
876        // properly clear the underlying storage.
877        //
878        // At this point, you might be scared about the potential for deadlocking. I am
879        // as well, but I'm convinced we're fine:
880        // 1. the lock for `by_room` is usually held only for a short while, and
881        //    independently of the other two kinds.
882        // 2. the state may acquire the store cross-process lock internally, but only
883        //    while the state's methods are called (so it's always transient). As a
884        //    result, as soon as we've acquired the state locks, the store lock ought to
885        //    be free.
886        // 3. The store lock is held explicitly only in a small scoped area below.
887        // 4. Then the store lock will be held internally when calling `reset()`, but at
888        //    this point it's only held for a short while each time, so rooms will take
889        //    turn to acquire it.
890
891        let rooms = self.by_room.write().await;
892
893        // Collect all the rooms' state locks, first: we can clear the storage only when
894        // nobody will touch it at the same time.
895        let room_locks = join_all(
896            rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
897        )
898        .await;
899
900        // Clear the storage for all the rooms, using the storage facility.
901        self.store.lock().await?.clear_all_linked_chunks().await?;
902
903        // At this point, all the in-memory linked chunks are desynchronized from the
904        // storage. Resynchronize them manually by calling reset(), and
905        // propagate updates to observers.
906        try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
907            let updates_as_vector_diffs = state_guard.reset().await?;
908
909            let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
910                diffs: updates_as_vector_diffs,
911                origin: EventsOrigin::Cache,
912            });
913
914            let _ = room
915                .inner
916                .generic_update_sender
917                .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
918
919            Ok::<_, EventCacheError>(())
920        }))
921        .await?;
922
923        Ok(())
924    }
925
926    /// Handles a single set of room updates at once.
927    #[instrument(skip(self, updates))]
928    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
929        // First, take the lock that indicates we're processing updates, to avoid
930        // handling multiple updates concurrently.
931        let _lock = {
932            let _timer = timer!("Taking the `multiple_room_updates_lock`");
933            self.multiple_room_updates_lock.lock().await
934        };
935
936        // Note: bnjbvr tried to make this concurrent at some point, but it turned out
937        // to be a performance regression, even for large sync updates. Lacking
938        // time to investigate, this code remains sequential for now. See also
939        // https://github.com/matrix-org/matrix-rust-sdk/pull/5426.
940
941        // Left rooms.
942        for (room_id, left_room_update) in updates.left {
943            let room = self.for_room(&room_id).await?;
944
945            if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
946                // Non-fatal error, try to continue to the next room.
947                error!("handling left room update: {err}");
948            }
949        }
950
951        // Joined rooms.
952        for (room_id, joined_room_update) in updates.joined {
953            trace!(?room_id, "Handling a `JoinedRoomUpdate`");
954
955            let room = self.for_room(&room_id).await?;
956
957            if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
958                // Non-fatal error, try to continue to the next room.
959                error!(%room_id, "handling joined room update: {err}");
960            }
961        }
962
963        // Invited rooms.
964        // TODO: we don't anything with `updates.invite` at this point.
965
966        Ok(())
967    }
968
969    /// Return a room-specific view over the [`EventCache`].
970    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
971        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
972        // write lock.
973        let by_room_guard = self.by_room.read().await;
974
975        match by_room_guard.get(room_id) {
976            Some(room) => Ok(room.clone()),
977
978            None => {
979                // Slow-path: the entry doesn't exist; let's acquire a write lock.
980                drop(by_room_guard);
981                let mut by_room_guard = self.by_room.write().await;
982
983                // In the meanwhile, some other caller might have obtained write access and done
984                // the same, so check for existence again.
985                if let Some(room) = by_room_guard.get(room_id) {
986                    return Ok(room.clone());
987                }
988
989                let pagination_status =
990                    SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
991
992                let Some(client) = self.client.get() else {
993                    return Err(EventCacheError::ClientDropped);
994                };
995
996                let room = client
997                    .get_room(room_id)
998                    .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
999                let room_version_rules = room.clone_info().room_version_rules_or_default();
1000
1001                let enabled_thread_support = matches!(
1002                    client.base_client().threading_support,
1003                    ThreadingSupport::Enabled { .. }
1004                );
1005
1006                let room_state = RoomEventCacheState::new(
1007                    room_id.to_owned(),
1008                    room_version_rules,
1009                    enabled_thread_support,
1010                    self.linked_chunk_update_sender.clone(),
1011                    self.store.clone(),
1012                    pagination_status.clone(),
1013                )
1014                .await?;
1015
1016                let timeline_is_not_empty =
1017                    room_state.room_linked_chunk().revents().next().is_some();
1018
1019                // SAFETY: we must have subscribed before reaching this code, otherwise
1020                // something is very wrong.
1021                let auto_shrink_sender =
1022                    self.auto_shrink_sender.get().cloned().expect(
1023                        "we must have called `EventCache::subscribe()` before calling here.",
1024                    );
1025
1026                let room_event_cache = RoomEventCache::new(
1027                    self.client.clone(),
1028                    room_state,
1029                    pagination_status,
1030                    room_id.to_owned(),
1031                    auto_shrink_sender,
1032                    self.generic_update_sender.clone(),
1033                );
1034
1035                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1036
1037                // If at least one event has been loaded, it means there is a timeline. Let's
1038                // emit a generic update.
1039                if timeline_is_not_empty {
1040                    let _ = self
1041                        .generic_update_sender
1042                        .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1043                }
1044
1045                Ok(room_event_cache)
1046            }
1047        }
1048    }
1049}
1050
1051/// The result of a single back-pagination request.
1052#[derive(Debug)]
1053pub struct BackPaginationOutcome {
1054    /// Did the back-pagination reach the start of the timeline?
1055    pub reached_start: bool,
1056
1057    /// All the events that have been returned in the back-pagination
1058    /// request.
1059    ///
1060    /// Events are presented in reverse order: the first element of the vec,
1061    /// if present, is the most "recent" event from the chunk (or
1062    /// technically, the last one in the topological ordering).
1063    pub events: Vec<TimelineEvent>,
1064}
1065
1066/// Represents a timeline update of a room. It hides the details of
1067/// [`RoomEventCacheUpdate`] by being more generic.
1068///
1069/// This is used by [`EventCache::subscribe_to_room_generic_updates`]. Please
1070/// read it to learn more about the motivation behind this type.
1071#[derive(Clone, Debug)]
1072pub struct RoomEventCacheGenericUpdate {
1073    /// The room ID owning the timeline.
1074    pub room_id: OwnedRoomId,
1075}
1076
1077/// An update being triggered when events change in the persisted event cache
1078/// for any room.
1079#[derive(Clone, Debug)]
1080struct RoomEventCacheLinkedChunkUpdate {
1081    /// The linked chunk affected by the update.
1082    linked_chunk_id: OwnedLinkedChunkId,
1083
1084    /// A vector of all the linked chunk updates that happened during this event
1085    /// cache update.
1086    updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1087}
1088
1089impl RoomEventCacheLinkedChunkUpdate {
1090    /// Return all the new events propagated by this update, in topological
1091    /// order.
1092    pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1093        use itertools::Either;
1094        self.updates.into_iter().flat_map(|update| match update {
1095            linked_chunk::Update::PushItems { items, .. } => {
1096                Either::Left(Either::Left(items.into_iter()))
1097            }
1098            linked_chunk::Update::ReplaceItem { item, .. } => {
1099                Either::Left(Either::Right(std::iter::once(item)))
1100            }
1101            linked_chunk::Update::RemoveItem { .. }
1102            | linked_chunk::Update::DetachLastItems { .. }
1103            | linked_chunk::Update::StartReattachItems
1104            | linked_chunk::Update::EndReattachItems
1105            | linked_chunk::Update::NewItemsChunk { .. }
1106            | linked_chunk::Update::NewGapChunk { .. }
1107            | linked_chunk::Update::RemoveChunk(..)
1108            | linked_chunk::Update::Clear => {
1109                // All these updates don't contain any new event.
1110                Either::Right(std::iter::empty())
1111            }
1112        })
1113    }
1114}
1115
1116/// An update related to events happened in a room.
1117#[derive(Debug, Clone)]
1118pub enum RoomEventCacheUpdate {
1119    /// The fully read marker has moved to a different event.
1120    MoveReadMarkerTo {
1121        /// Event at which the read marker is now pointing.
1122        event_id: OwnedEventId,
1123    },
1124
1125    /// The members have changed.
1126    UpdateMembers {
1127        /// Collection of ambiguity changes that room member events trigger.
1128        ///
1129        /// This is a map of event ID of the `m.room.member` event to the
1130        /// details of the ambiguity change.
1131        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1132    },
1133
1134    /// The room has received updates for the timeline as _diffs_.
1135    UpdateTimelineEvents {
1136        /// Diffs to apply to the timeline.
1137        diffs: Vec<VectorDiff<TimelineEvent>>,
1138
1139        /// Where the diffs are coming from.
1140        origin: EventsOrigin,
1141    },
1142
1143    /// The room has received new ephemeral events.
1144    AddEphemeralEvents {
1145        /// XXX: this is temporary, until read receipts are handled in the event
1146        /// cache
1147        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1148    },
1149}
1150
1151/// Indicate where events are coming from.
1152#[derive(Debug, Clone)]
1153pub enum EventsOrigin {
1154    /// Events are coming from a sync.
1155    Sync,
1156
1157    /// Events are coming from pagination.
1158    Pagination,
1159
1160    /// The cause of the change is purely internal to the cache.
1161    Cache,
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166    use std::{ops::Not, sync::Arc, time::Duration};
1167
1168    use assert_matches::assert_matches;
1169    use futures_util::FutureExt as _;
1170    use matrix_sdk_base::{
1171        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1172        sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1173        RoomState,
1174    };
1175    use matrix_sdk_test::{
1176        async_test, event_factory::EventFactory, JoinedRoomBuilder, SyncResponseBuilder,
1177    };
1178    use ruma::{event_id, room_id, serde::Raw, user_id};
1179    use serde_json::json;
1180    use tokio::time::sleep;
1181
1182    use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1183    use crate::test_utils::{
1184        assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1185    };
1186
1187    #[async_test]
1188    async fn test_must_explicitly_subscribe() {
1189        let client = logged_in_client(None).await;
1190
1191        let event_cache = client.event_cache();
1192
1193        // If I create a room event subscriber for a room before subscribing the event
1194        // cache,
1195        let room_id = room_id!("!omelette:fromage.fr");
1196        let result = event_cache.for_room(room_id).await;
1197
1198        // Then it fails, because one must explicitly call `.subscribe()` on the event
1199        // cache.
1200        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1201    }
1202
1203    #[async_test]
1204    async fn test_uniq_read_marker() {
1205        let client = logged_in_client(None).await;
1206        let room_id = room_id!("!galette:saucisse.bzh");
1207        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1208
1209        let event_cache = client.event_cache();
1210
1211        event_cache.subscribe().unwrap();
1212
1213        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1214        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1215        let (events, mut stream) = room_event_cache.subscribe().await;
1216
1217        assert!(events.is_empty());
1218
1219        // When sending multiple times the same read marker event,…
1220        let read_marker_event = Raw::from_json_string(
1221            json!({
1222                "content": {
1223                    "event_id": "$crepe:saucisse.bzh"
1224                },
1225                "room_id": "!galette:saucisse.bzh",
1226                "type": "m.fully_read"
1227            })
1228            .to_string(),
1229        )
1230        .unwrap();
1231        let account_data = vec![read_marker_event; 100];
1232
1233        room_event_cache
1234            .inner
1235            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1236            .await
1237            .unwrap();
1238
1239        // … there's only one read marker update.
1240        assert_matches!(
1241            stream.recv().await.unwrap(),
1242            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1243        );
1244
1245        assert!(stream.recv().now_or_never().is_none());
1246
1247        // None, because an account data doesn't trigger a generic update.
1248        assert!(generic_stream.recv().now_or_never().is_none());
1249    }
1250
1251    #[async_test]
1252    async fn test_get_event_by_id() {
1253        let client = logged_in_client(None).await;
1254        let room_id1 = room_id!("!galette:saucisse.bzh");
1255        let room_id2 = room_id!("!crepe:saucisse.bzh");
1256
1257        client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1258        client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1259
1260        let event_cache = client.event_cache();
1261        event_cache.subscribe().unwrap();
1262
1263        // Insert two rooms with a few events.
1264        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1265
1266        let eid1 = event_id!("$1");
1267        let eid2 = event_id!("$2");
1268        let eid3 = event_id!("$3");
1269
1270        let joined_room_update1 = JoinedRoomUpdate {
1271            timeline: Timeline {
1272                events: vec![
1273                    f.text_msg("hey").event_id(eid1).into(),
1274                    f.text_msg("you").event_id(eid2).into(),
1275                ],
1276                ..Default::default()
1277            },
1278            ..Default::default()
1279        };
1280
1281        let joined_room_update2 = JoinedRoomUpdate {
1282            timeline: Timeline {
1283                events: vec![f.text_msg("bjr").event_id(eid3).into()],
1284                ..Default::default()
1285            },
1286            ..Default::default()
1287        };
1288
1289        let mut updates = RoomUpdates::default();
1290        updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1291        updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1292
1293        // Have the event cache handle them.
1294        event_cache.inner.handle_room_updates(updates).await.unwrap();
1295
1296        // We can find the events in a single room.
1297        let room1 = client.get_room(room_id1).unwrap();
1298
1299        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1300
1301        let found1 = room_event_cache.find_event(eid1).await.unwrap();
1302        assert_event_matches_msg(&found1, "hey");
1303
1304        let found2 = room_event_cache.find_event(eid2).await.unwrap();
1305        assert_event_matches_msg(&found2, "you");
1306
1307        // Retrieving the event with id3 from the room which doesn't contain it will
1308        // fail…
1309        assert!(room_event_cache.find_event(eid3).await.is_none());
1310    }
1311
1312    #[async_test]
1313    async fn test_save_event() {
1314        let client = logged_in_client(None).await;
1315        let room_id = room_id!("!galette:saucisse.bzh");
1316
1317        let event_cache = client.event_cache();
1318        event_cache.subscribe().unwrap();
1319
1320        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1321        let event_id = event_id!("$1");
1322
1323        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1324        let room = client.get_room(room_id).unwrap();
1325
1326        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1327        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1328
1329        // Retrieving the event at the room-wide cache works.
1330        assert!(room_event_cache.find_event(event_id).await.is_some());
1331    }
1332
1333    #[async_test]
1334    async fn test_generic_update_when_loading_rooms() {
1335        // Create 2 rooms. One of them has data in the event cache storage.
1336        let user = user_id!("@mnt_io:matrix.org");
1337        let client = logged_in_client(None).await;
1338        let room_id_0 = room_id!("!raclette:patate.ch");
1339        let room_id_1 = room_id!("!fondue:patate.ch");
1340
1341        let event_factory = EventFactory::new().room(room_id_0).sender(user);
1342
1343        let event_cache = client.event_cache();
1344        event_cache.subscribe().unwrap();
1345
1346        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1347        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1348
1349        client
1350            .event_cache_store()
1351            .lock()
1352            .await
1353            .unwrap()
1354            .handle_linked_chunk_updates(
1355                LinkedChunkId::Room(room_id_0),
1356                vec![
1357                    // Non-empty items chunk.
1358                    Update::NewItemsChunk {
1359                        previous: None,
1360                        new: ChunkIdentifier::new(0),
1361                        next: None,
1362                    },
1363                    Update::PushItems {
1364                        at: Position::new(ChunkIdentifier::new(0), 0),
1365                        items: vec![event_factory
1366                            .text_msg("hello")
1367                            .sender(user)
1368                            .event_id(event_id!("$ev0"))
1369                            .into_event()],
1370                    },
1371                ],
1372            )
1373            .await
1374            .unwrap();
1375
1376        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1377
1378        // Room 0 has initial data, so it must trigger a generic update.
1379        {
1380            let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1381
1382            assert_matches!(
1383                generic_stream.recv().await,
1384                Ok(RoomEventCacheGenericUpdate { room_id }) => {
1385                    assert_eq!(room_id, room_id_0);
1386                }
1387            );
1388        }
1389
1390        // Room 1 has NO initial data, so nothing should happen.
1391        {
1392            let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1393
1394            assert!(generic_stream.recv().now_or_never().is_none());
1395        }
1396    }
1397
1398    #[async_test]
1399    async fn test_generic_update_when_paginating_room() {
1400        // Create 1 room, with 4 chunks in the event cache storage.
1401        let user = user_id!("@mnt_io:matrix.org");
1402        let client = logged_in_client(None).await;
1403        let room_id = room_id!("!raclette:patate.ch");
1404
1405        let event_factory = EventFactory::new().room(room_id).sender(user);
1406
1407        let event_cache = client.event_cache();
1408        event_cache.subscribe().unwrap();
1409
1410        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1411
1412        client
1413            .event_cache_store()
1414            .lock()
1415            .await
1416            .unwrap()
1417            .handle_linked_chunk_updates(
1418                LinkedChunkId::Room(room_id),
1419                vec![
1420                    // Empty chunk.
1421                    Update::NewItemsChunk {
1422                        previous: None,
1423                        new: ChunkIdentifier::new(0),
1424                        next: None,
1425                    },
1426                    // Empty chunk.
1427                    Update::NewItemsChunk {
1428                        previous: Some(ChunkIdentifier::new(0)),
1429                        new: ChunkIdentifier::new(1),
1430                        next: None,
1431                    },
1432                    // Non-empty items chunk.
1433                    Update::NewItemsChunk {
1434                        previous: Some(ChunkIdentifier::new(1)),
1435                        new: ChunkIdentifier::new(2),
1436                        next: None,
1437                    },
1438                    Update::PushItems {
1439                        at: Position::new(ChunkIdentifier::new(2), 0),
1440                        items: vec![event_factory
1441                            .text_msg("hello")
1442                            .sender(user)
1443                            .event_id(event_id!("$ev0"))
1444                            .into_event()],
1445                    },
1446                    // Non-empty items chunk.
1447                    Update::NewItemsChunk {
1448                        previous: Some(ChunkIdentifier::new(2)),
1449                        new: ChunkIdentifier::new(3),
1450                        next: None,
1451                    },
1452                    Update::PushItems {
1453                        at: Position::new(ChunkIdentifier::new(3), 0),
1454                        items: vec![event_factory
1455                            .text_msg("world")
1456                            .sender(user)
1457                            .event_id(event_id!("$ev1"))
1458                            .into_event()],
1459                    },
1460                ],
1461            )
1462            .await
1463            .unwrap();
1464
1465        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1466
1467        // Room is initialised, it gets one event in the timeline.
1468        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1469
1470        assert_matches!(
1471            generic_stream.recv().await,
1472            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1473                assert_eq!(room_id, expected_room_id);
1474            }
1475        );
1476
1477        let pagination = room_event_cache.pagination();
1478
1479        // Paginate, it gets one new event in the timeline.
1480        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1481
1482        assert_eq!(pagination_outcome.events.len(), 1);
1483        assert!(pagination_outcome.reached_start.not());
1484        assert_matches!(
1485            generic_stream.recv().await,
1486            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1487                assert_eq!(room_id, expected_room_id);
1488            }
1489        );
1490
1491        // Paginate, it gets zero new event in the timeline.
1492        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1493
1494        assert!(pagination_outcome.events.is_empty());
1495        assert!(pagination_outcome.reached_start.not());
1496        assert!(generic_stream.recv().now_or_never().is_none());
1497
1498        // Paginate once more. Just checking our scenario is correct.
1499        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1500
1501        assert!(pagination_outcome.reached_start);
1502        assert!(generic_stream.recv().now_or_never().is_none());
1503    }
1504
1505    #[async_test]
1506    async fn test_for_room_when_room_is_not_found() {
1507        let client = logged_in_client(None).await;
1508        let room_id = room_id!("!raclette:patate.ch");
1509
1510        let event_cache = client.event_cache();
1511        event_cache.subscribe().unwrap();
1512
1513        // Room doesn't exist. It returns an error.
1514        assert_matches!(
1515            event_cache.for_room(room_id).await,
1516            Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1517                assert_eq!(room_id, not_found_room_id);
1518            }
1519        );
1520
1521        // Now create the room.
1522        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1523
1524        // Room exists. Everything fine.
1525        assert!(event_cache.for_room(room_id).await.is_ok());
1526    }
1527
1528    /// Test that the event cache does not create reference cycles or tasks that
1529    /// retain its reference indefinitely, preventing it from being deallocated.
1530    #[cfg(not(target_family = "wasm"))]
1531    #[async_test]
1532    async fn test_no_refcycle_event_cache_tasks() {
1533        let client = MockClientBuilder::new(None).build().await;
1534
1535        // Wait for the init tasks to die.
1536        sleep(Duration::from_secs(1)).await;
1537
1538        let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1539        assert_eq!(event_cache_weak.strong_count(), 1);
1540
1541        {
1542            let room_id = room_id!("!room:example.org");
1543
1544            // Have the client know the room.
1545            let response = SyncResponseBuilder::default()
1546                .add_joined_room(JoinedRoomBuilder::new(room_id))
1547                .build_sync_response();
1548            client.inner.base_client.receive_sync_response(response).await.unwrap();
1549
1550            client.event_cache().subscribe().unwrap();
1551
1552            let (_room_event_cache, _drop_handles) =
1553                client.get_room(room_id).unwrap().event_cache().await.unwrap();
1554        }
1555
1556        drop(client);
1557
1558        // Give a bit of time for background tasks to die.
1559        sleep(Duration::from_secs(1)).await;
1560
1561        // No strong counts should exist now that the Client has been dropped.
1562        assert_eq!(
1563            event_cache_weak.strong_count(),
1564            0,
1565            "Too many strong references to the event cache {}",
1566            event_cache_weak.strong_count()
1567        );
1568    }
1569}