matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::{BTreeMap, HashMap},
32    fmt,
33    sync::{Arc, OnceLock, Weak},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40    ThreadingSupport,
41    cross_process_lock::CrossProcessLockError,
42    deserialized_responses::{AmbiguityChange, TimelineEvent},
43    event_cache::{
44        Gap,
45        store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
46    },
47    linked_chunk::{self, OwnedLinkedChunkId, lazy_loader::LazyLoaderError},
48    serde_helpers::extract_thread_root_from_content,
49    sync::RoomUpdates,
50    task_monitor::BackgroundTaskHandle,
51    timer,
52};
53use ruma::{
54    OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, events::AnySyncEphemeralRoomEvent,
55    serde::Raw,
56};
57use tokio::{
58    select,
59    sync::{
60        Mutex, RwLock,
61        broadcast::{Receiver, Sender, channel, error::RecvError},
62        mpsc,
63    },
64};
65use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
66
67use crate::{
68    Client,
69    client::{ClientInner, WeakClient},
70    event_cache::room::RoomEventCacheStateLock,
71    send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
72};
73
74mod deduplicator;
75mod pagination;
76#[cfg(feature = "e2e-encryption")]
77mod redecryptor;
78mod room;
79
80pub use pagination::{RoomPagination, RoomPaginationStatus};
81#[cfg(feature = "e2e-encryption")]
82pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
83pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
84
85/// An error observed in the [`EventCache`].
86#[derive(thiserror::Error, Debug)]
87pub enum EventCacheError {
88    /// The [`EventCache`] instance hasn't been initialized with
89    /// [`EventCache::subscribe`]
90    #[error(
91        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
92    )]
93    NotSubscribedYet,
94
95    /// Room is not found.
96    #[error("Room `{room_id}` is not found.")]
97    RoomNotFound {
98        /// The ID of the room not being found.
99        room_id: OwnedRoomId,
100    },
101
102    /// An error has been observed while back-paginating.
103    #[error(transparent)]
104    BackpaginationError(Box<crate::Error>),
105
106    /// Back-pagination was already happening in a given room, where we tried to
107    /// back-paginate again.
108    #[error("We were already back-paginating.")]
109    AlreadyBackpaginating,
110
111    /// An error happening when interacting with storage.
112    #[error(transparent)]
113    Storage(#[from] EventCacheStoreError),
114
115    /// An error happening when attempting to (cross-process) lock storage.
116    #[error(transparent)]
117    LockingStorage(#[from] CrossProcessLockError),
118
119    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
120    /// to. It's possible this weak reference points to nothing anymore, at
121    /// times where we try to use the client.
122    #[error("The owning client of the event cache has been dropped.")]
123    ClientDropped,
124
125    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
126    /// loader.
127    ///
128    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
129    #[error(transparent)]
130    LinkedChunkLoader(#[from] LazyLoaderError),
131
132    /// An error happened when reading the metadata of a linked chunk, upon
133    /// reload.
134    #[error("the linked chunk metadata is invalid: {details}")]
135    InvalidLinkedChunkMetadata {
136        /// A string containing details about the error.
137        details: String,
138    },
139}
140
141/// A result using the [`EventCacheError`].
142pub type Result<T> = std::result::Result<T, EventCacheError>;
143
144/// Hold handles to the tasks spawn by a [`EventCache`].
145pub struct EventCacheDropHandles {
146    /// Task that listens to room updates.
147    listen_updates_task: BackgroundTaskHandle,
148
149    /// Task that listens to updates to the user's ignored list.
150    ignore_user_list_update_task: BackgroundTaskHandle,
151
152    /// The task used to automatically shrink the linked chunks.
153    auto_shrink_linked_chunk_task: BackgroundTaskHandle,
154
155    /// The task used to automatically redecrypt UTDs.
156    #[cfg(feature = "e2e-encryption")]
157    _redecryptor: redecryptor::Redecryptor,
158}
159
160impl fmt::Debug for EventCacheDropHandles {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
163    }
164}
165
166impl Drop for EventCacheDropHandles {
167    fn drop(&mut self) {
168        self.listen_updates_task.abort();
169        self.ignore_user_list_update_task.abort();
170        self.auto_shrink_linked_chunk_task.abort();
171    }
172}
173
174/// An event cache, providing lots of useful functionality for clients.
175///
176/// Cloning is shallow, and thus is cheap to do.
177///
178/// See also the module-level comment.
179#[derive(Clone)]
180pub struct EventCache {
181    /// Reference to the inner cache.
182    inner: Arc<EventCacheInner>,
183}
184
185impl fmt::Debug for EventCache {
186    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187        f.debug_struct("EventCache").finish_non_exhaustive()
188    }
189}
190
191impl EventCache {
192    /// Create a new [`EventCache`] for the given client.
193    pub(crate) fn new(client: &Arc<ClientInner>, event_cache_store: EventCacheStoreLock) -> Self {
194        let (generic_update_sender, _) = channel(128);
195        let (linked_chunk_update_sender, _) = channel(128);
196
197        let weak_client = WeakClient::from_inner(client);
198
199        let (thread_subscriber_sender, thread_subscriber_receiver) = channel(128);
200        let thread_subscriber_task = client
201            .task_monitor
202            .spawn_background_task(
203                "event_cache::thread_subscriber",
204                Self::thread_subscriber_task(
205                    weak_client.clone(),
206                    linked_chunk_update_sender.clone(),
207                    thread_subscriber_sender,
208                ),
209            )
210            .abort_on_drop();
211
212        #[cfg(feature = "experimental-search")]
213        let search_indexing_task = client
214            .task_monitor
215            .spawn_background_task(
216                "event_cache::search_indexing",
217                Self::search_indexing_task(weak_client.clone(), linked_chunk_update_sender.clone()),
218            )
219            .abort_on_drop();
220
221        #[cfg(feature = "e2e-encryption")]
222        let redecryption_channels = redecryptor::RedecryptorChannels::new();
223
224        Self {
225            inner: Arc::new(EventCacheInner {
226                client: weak_client,
227                store: event_cache_store,
228                multiple_room_updates_lock: Default::default(),
229                by_room: Default::default(),
230                drop_handles: Default::default(),
231                auto_shrink_sender: Default::default(),
232                generic_update_sender,
233                linked_chunk_update_sender,
234                _thread_subscriber_task: thread_subscriber_task,
235                #[cfg(feature = "experimental-search")]
236                _search_indexing_task: search_indexing_task,
237                #[cfg(feature = "e2e-encryption")]
238                redecryption_channels,
239                thread_subscriber_receiver,
240            }),
241        }
242    }
243
244    /// Subscribes to updates that a thread subscription has been sent.
245    ///
246    /// For testing purposes only.
247    #[doc(hidden)]
248    pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
249        self.inner.thread_subscriber_receiver.resubscribe()
250    }
251
252    /// Starts subscribing the [`EventCache`] to sync responses, if not done
253    /// before.
254    ///
255    /// Re-running this has no effect if we already subscribed before, and is
256    /// cheap.
257    pub fn subscribe(&self) -> Result<()> {
258        let client = self.inner.client()?;
259
260        // Initialize the drop handles.
261        let _ = self.inner.drop_handles.get_or_init(|| {
262            let task_monitor = client.task_monitor();
263
264            // Spawn the task that will listen to all the room updates at once.
265            let listen_updates_task = task_monitor.spawn_background_task("event_cache::listen_updates", Self::listen_task(
266                self.inner.clone(),
267                client.subscribe_to_all_room_updates(),
268            ));
269
270            let ignore_user_list_update_task = task_monitor.spawn_background_task("event_cache::ignore_user_list_update_task", Self::ignore_user_list_update_task(
271                self.inner.clone(),
272                client.subscribe_to_ignore_user_list_changes(),
273            ));
274
275            let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);
276
277            // Force-initialize the sender in the [`RoomEventCacheInner`].
278            self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
279
280            let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", Self::auto_shrink_linked_chunk_task(
281                Arc::downgrade(&self.inner),
282                auto_shrink_receiver,
283            ));
284
285            #[cfg(feature = "e2e-encryption")]
286            let redecryptor = {
287                let receiver = self
288                    .inner
289                    .redecryption_channels
290                    .decryption_request_receiver
291                    .lock()
292                    .take()
293                    .expect("We should have initialized the channel an subscribing should happen only once");
294
295                redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
296            };
297
298
299            Arc::new(EventCacheDropHandles {
300                listen_updates_task,
301                ignore_user_list_update_task,
302                auto_shrink_linked_chunk_task,
303                #[cfg(feature = "e2e-encryption")]
304                _redecryptor: redecryptor,
305            })
306        });
307
308        Ok(())
309    }
310
311    #[instrument(skip_all)]
312    async fn ignore_user_list_update_task(
313        inner: Arc<EventCacheInner>,
314        mut ignore_user_list_stream: Subscriber<Vec<String>>,
315    ) {
316        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
317        span.follows_from(Span::current());
318
319        async move {
320            while ignore_user_list_stream.next().await.is_some() {
321                info!("Received an ignore user list change");
322                if let Err(err) = inner.clear_all_rooms().await {
323                    error!("when clearing room storage after ignore user list change: {err}");
324                }
325            }
326            info!("Ignore user list stream has closed");
327        }
328        .instrument(span)
329        .await;
330    }
331
332    /// For benchmarking purposes only.
333    #[doc(hidden)]
334    pub async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
335        self.inner.handle_room_updates(updates).await
336    }
337
338    #[instrument(skip_all)]
339    async fn listen_task(
340        inner: Arc<EventCacheInner>,
341        mut room_updates_feed: Receiver<RoomUpdates>,
342    ) {
343        trace!("Spawning the listen task");
344        loop {
345            match room_updates_feed.recv().await {
346                Ok(updates) => {
347                    trace!("Receiving `RoomUpdates`");
348
349                    if let Err(err) = inner.handle_room_updates(updates).await {
350                        match err {
351                            EventCacheError::ClientDropped => {
352                                // The client has dropped, exit the listen task.
353                                info!(
354                                    "Closing the event cache global listen task because client dropped"
355                                );
356                                break;
357                            }
358                            err => {
359                                error!("Error when handling room updates: {err}");
360                            }
361                        }
362                    }
363                }
364
365                Err(RecvError::Lagged(num_skipped)) => {
366                    // Forget everything we know; we could have missed events, and we have
367                    // no way to reconcile at the moment!
368                    // TODO: implement Smart Matching™,
369                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
370                    if let Err(err) = inner.clear_all_rooms().await {
371                        error!("when clearing storage after lag in listen_task: {err}");
372                    }
373                }
374
375                Err(RecvError::Closed) => {
376                    // The sender has shut down, exit.
377                    info!("Closing the event cache global listen task because receiver closed");
378                    break;
379                }
380            }
381        }
382    }
383
384    /// Spawns the task that will listen to auto-shrink notifications.
385    ///
386    /// The auto-shrink mechanism works this way:
387    ///
388    /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
389    ///   increment the active number of subscribers to that room, aka
390    ///   [`RoomEventCacheState::subscriber_count`].
391    /// - When that subscriber is dropped, it will decrement that count; and
392    ///   notify the task below if it reached 0.
393    /// - The task spawned here, owned by the [`EventCacheInner`], will listen
394    ///   to such notifications that a room may be shrunk. It will attempt an
395    ///   auto-shrink, by letting the inner state decide whether this is a good
396    ///   time to do so (new subscribers might have spawned in the meanwhile).
397    #[instrument(skip_all)]
398    async fn auto_shrink_linked_chunk_task(
399        inner: Weak<EventCacheInner>,
400        mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
401    ) {
402        while let Some(room_id) = rx.recv().await {
403            trace!(for_room = %room_id, "received notification to shrink");
404
405            let Some(inner) = inner.upgrade() else {
406                return;
407            };
408
409            let room = match inner.for_room(&room_id).await {
410                Ok(room) => room,
411                Err(err) => {
412                    warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
413                    continue;
414                }
415            };
416
417            trace!("waiting for state lock…");
418            let mut state = match room.inner.state.write().await {
419                Ok(state) => state,
420                Err(err) => {
421                    warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}");
422                    continue;
423                }
424            };
425
426            match state.auto_shrink_if_no_subscribers().await {
427                Ok(diffs) => {
428                    if let Some(diffs) = diffs {
429                        // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
430                        // subscribers, right? RIGHT? Especially because the state is guarded behind
431                        // a lock.
432                        //
433                        // However, better safe than sorry, and it's cheap to send an update here,
434                        // so let's do it!
435                        if !diffs.is_empty() {
436                            let _ = room.inner.update_sender.send(
437                                RoomEventCacheUpdate::UpdateTimelineEvents {
438                                    diffs,
439                                    origin: EventsOrigin::Cache,
440                                },
441                            );
442                        }
443                    } else {
444                        debug!("auto-shrinking didn't happen");
445                    }
446                }
447
448                Err(err) => {
449                    // There's not much we can do here, unfortunately.
450                    warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
451                }
452            }
453        }
454
455        info!("Auto-shrink linked chunk task has been closed, exiting");
456    }
457
458    /// Check whether [`EventCache::subscribe`] has been called.
459    pub fn has_subscribed(&self) -> bool {
460        self.inner.drop_handles.get().is_some()
461    }
462
463    /// Return a room-specific view over the [`EventCache`].
464    pub(crate) async fn for_room(
465        &self,
466        room_id: &RoomId,
467    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
468        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
469            return Err(EventCacheError::NotSubscribedYet);
470        };
471
472        let room = self.inner.for_room(room_id).await?;
473
474        Ok((room, drop_handles))
475    }
476
477    /// Cleanly clear all the rooms' event caches.
478    ///
479    /// This will notify any live observers that the room has been cleared.
480    pub async fn clear_all_rooms(&self) -> Result<()> {
481        self.inner.clear_all_rooms().await
482    }
483
484    /// Subscribe to room _generic_ updates.
485    ///
486    /// If one wants to listen what has changed in a specific room, the
487    /// [`RoomEventCache::subscribe`] is recommended. However, the
488    /// [`RoomEventCacheSubscriber`] type triggers side-effects.
489    ///
490    /// If one wants to get a high-overview, generic, updates for rooms, and
491    /// without side-effects, this method is recommended. Also, dropping the
492    /// receiver of this channel will not trigger any side-effect.
493    pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
494        self.inner.generic_update_sender.subscribe()
495    }
496
497    /// React to a given linked chunk update by subscribing the user to a
498    /// thread, if needs be (when the user got mentioned in a thread reply, for
499    /// a thread they were not subscribed to).
500    ///
501    /// Returns a boolean indicating whether the task should keep on running or
502    /// not.
503    #[instrument(skip(client, thread_subscriber_sender))]
504    async fn handle_thread_subscriber_linked_chunk_update(
505        client: &WeakClient,
506        thread_subscriber_sender: &Sender<()>,
507        up: RoomEventCacheLinkedChunkUpdate,
508    ) -> bool {
509        let Some(client) = client.get() else {
510            // Client shutting down.
511            debug!("Client is shutting down, exiting thread subscriber task");
512            return false;
513        };
514
515        let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
516            trace!("received an update for a non-thread linked chunk, ignoring");
517            return true;
518        };
519
520        let Some(room) = client.get_room(room_id) else {
521            warn!(%room_id, "unknown room");
522            return true;
523        };
524
525        let thread_root = thread_root.clone();
526
527        let mut new_events = up.events().peekable();
528
529        if new_events.peek().is_none() {
530            // No new events, nothing to do.
531            return true;
532        }
533
534        // This `PushContext` is going to be used to compute whether an in-thread event
535        // would trigger a mention.
536        //
537        // Of course, we're not interested in an in-thread event causing a mention,
538        // because it's part of a thread we've subscribed to. So the
539        // `PushContext` must not include the check for thread subscriptions (otherwise
540        // it would be impossible to subscribe to new threads).
541
542        let with_thread_subscriptions = false;
543
544        let Some(push_context) = room
545            .push_context_internal(with_thread_subscriptions)
546            .await
547            .inspect_err(|err| {
548                warn!("Failed to get push context for threads: {err}");
549            })
550            .ok()
551            .flatten()
552        else {
553            warn!("Missing push context for thread subscriptions.");
554            return true;
555        };
556
557        let mut subscribe_up_to = None;
558
559        // Find if there's an event that would trigger a mention for the current
560        // user, iterating from the end of the new events towards the oldest, so we can
561        // find the most recent event to subscribe to.
562        for ev in new_events.rev() {
563            if push_context
564                .for_event(ev.raw())
565                .await
566                .into_iter()
567                .any(|action| action.should_notify())
568            {
569                let Some(event_id) = ev.event_id() else {
570                    // Shouldn't happen.
571                    continue;
572                };
573                subscribe_up_to = Some(event_id);
574                break;
575            }
576        }
577
578        // And if we've found such a mention, subscribe to the thread up to this
579        // event.
580        if let Some(event_id) = subscribe_up_to {
581            trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
582            if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
583                warn!(%err, "Failed to subscribe to thread");
584            } else {
585                let _ = thread_subscriber_sender.send(());
586            }
587        }
588
589        true
590    }
591
592    /// React to a given send queue update by subscribing the user to a
593    /// thread, if needs be (when the user sent an event in a thread they were
594    /// not subscribed to).
595    ///
596    /// Returns a boolean indicating whether the task should keep on running or
597    /// not.
598    #[instrument(skip(client, thread_subscriber_sender))]
599    async fn handle_thread_subscriber_send_queue_update(
600        client: &WeakClient,
601        thread_subscriber_sender: &Sender<()>,
602        events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
603        up: SendQueueUpdate,
604    ) -> bool {
605        let Some(client) = client.get() else {
606            // Client shutting down.
607            debug!("Client is shutting down, exiting thread subscriber task");
608            return false;
609        };
610
611        let room_id = up.room_id;
612        let Some(room) = client.get_room(&room_id) else {
613            warn!(%room_id, "unknown room");
614            return true;
615        };
616
617        let (thread_root, subscribe_up_to) = match up.update {
618            RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
619                match local_echo.content {
620                    LocalEchoContent::Event { serialized_event, .. } => {
621                        if let Some(thread_root) =
622                            extract_thread_root_from_content(serialized_event.into_raw().0)
623                        {
624                            events_being_sent.insert(local_echo.transaction_id, thread_root);
625                        }
626                    }
627                    LocalEchoContent::React { .. } => {
628                        // Nothing to do, reactions don't count as a thread
629                        // subscription.
630                    }
631                }
632                return true;
633            }
634
635            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
636                events_being_sent.remove(&transaction_id);
637                return true;
638            }
639
640            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
641                if let Some(thread_root) =
642                    extract_thread_root_from_content(new_content.into_raw().0)
643                {
644                    events_being_sent.insert(transaction_id, thread_root);
645                } else {
646                    // It could be that the event isn't part of a thread anymore; handle that by
647                    // removing the pending transaction id.
648                    events_being_sent.remove(&transaction_id);
649                }
650                return true;
651            }
652
653            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
654                if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
655                    (thread_root, event_id)
656                } else {
657                    // We don't know about the event that has been sent, so ignore it.
658                    trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
659                    return true;
660                }
661            }
662
663            RoomSendQueueUpdate::SendError { .. }
664            | RoomSendQueueUpdate::RetryEvent { .. }
665            | RoomSendQueueUpdate::MediaUpload { .. } => {
666                // Nothing to do for these bad boys.
667                return true;
668            }
669        };
670
671        // And if we've found such a mention, subscribe to the thread up to this event.
672        trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
673        if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
674        {
675            warn!(%err, "Failed to subscribe to thread");
676        } else {
677            let _ = thread_subscriber_sender.send(());
678        }
679
680        true
681    }
682
683    #[instrument(skip_all)]
684    async fn thread_subscriber_task(
685        client: WeakClient,
686        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
687        thread_subscriber_sender: Sender<()>,
688    ) {
689        let mut send_q_rx = if let Some(client) = client.get() {
690            if !client.enabled_thread_subscriptions() {
691                trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
692                return;
693            }
694
695            client.send_queue().subscribe()
696        } else {
697            trace!("Client is shutting down, not spawning thread subscriber task");
698            return;
699        };
700
701        let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
702
703        // A mapping of local echoes (events being sent), to their thread root, if
704        // they're in an in-thread reply.
705        //
706        // Entirely managed by `handle_thread_subscriber_send_queue_update`.
707        let mut events_being_sent = HashMap::new();
708
709        loop {
710            select! {
711                res = send_q_rx.recv() => {
712                    match res {
713                        Ok(up) => {
714                            if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
715                                break;
716                            }
717                        }
718                        Err(RecvError::Closed) => {
719                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
720                            break;
721                        }
722                        Err(RecvError::Lagged(num_skipped)) => {
723                            warn!(num_skipped, "Lagged behind linked chunk updates");
724                        }
725                    }
726                }
727
728                res = linked_chunk_rx.recv() => {
729                    match res {
730                        Ok(up) => {
731                            if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
732                                break;
733                            }
734                        }
735                        Err(RecvError::Closed) => {
736                            debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
737                            break;
738                        }
739                        Err(RecvError::Lagged(num_skipped)) => {
740                            warn!(num_skipped, "Lagged behind linked chunk updates");
741                        }
742                    }
743                }
744            }
745        }
746    }
747
748    /// Takes a [`TimelineEvent`] and passes it to the [`RoomIndex`] of the
749    /// given room which will add/remove/edit an event in the index based on
750    /// the event type.
751    #[cfg(feature = "experimental-search")]
752    #[instrument(skip_all)]
753    async fn search_indexing_task(
754        client: WeakClient,
755        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
756    ) {
757        let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
758
759        loop {
760            match linked_chunk_update_receiver.recv().await {
761                Ok(room_ec_lc_update) => {
762                    let OwnedLinkedChunkId::Room(room_id) =
763                        room_ec_lc_update.linked_chunk_id.clone()
764                    else {
765                        trace!("Received non-room updates, ignoring.");
766                        continue;
767                    };
768
769                    let mut timeline_events = room_ec_lc_update.events().peekable();
770
771                    if timeline_events.peek().is_none() {
772                        continue;
773                    }
774
775                    let Some(client) = client.get() else {
776                        trace!("Client is shutting down, not spawning thread subscriber task");
777                        return;
778                    };
779
780                    let maybe_room_cache = client.event_cache().for_room(&room_id).await;
781                    let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
782                        warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
783                        continue;
784                    };
785
786                    let maybe_room = client.get_room(&room_id);
787                    let Some(room) = maybe_room else {
788                        warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
789                        continue;
790                    };
791                    let redaction_rules =
792                        room.clone_info().room_version_rules_or_default().redaction;
793
794                    let mut search_index_guard = client.search_index().lock().await;
795
796                    if let Err(err) = search_index_guard
797                        .bulk_handle_timeline_event(
798                            timeline_events,
799                            &room_cache,
800                            &room_id,
801                            &redaction_rules,
802                        )
803                        .await
804                    {
805                        error!("Failed to handle events for indexing: {err}")
806                    }
807                }
808                Err(RecvError::Closed) => {
809                    debug!(
810                        "Linked chunk update channel has been closed, exiting thread subscriber task"
811                    );
812                    break;
813                }
814                Err(RecvError::Lagged(num_skipped)) => {
815                    warn!(num_skipped, "Lagged behind linked chunk updates");
816                }
817            }
818        }
819    }
820}
821
822struct EventCacheInner {
823    /// A weak reference to the inner client, useful when trying to get a handle
824    /// on the owning client.
825    client: WeakClient,
826
827    /// Reference to the underlying store.
828    store: EventCacheStoreLock,
829
830    /// A lock used when many rooms must be updated at once.
831    ///
832    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
833    /// ensure that multiple updates will be applied in the correct order, which
834    /// is enforced by taking this lock when handling an update.
835    // TODO: that's the place to add a cross-process lock!
836    multiple_room_updates_lock: Mutex<()>,
837
838    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
839    by_room: RwLock<HashMap<OwnedRoomId, RoomEventCache>>,
840
841    /// Handles to keep alive the task listening to updates.
842    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
843
844    /// A sender for notifications that a room *may* need to be auto-shrunk.
845    ///
846    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
847    /// instance.
848    ///
849    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
850    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
851
852    /// A sender for room generic update.
853    ///
854    /// See doc comment of [`RoomEventCacheGenericUpdate`] and
855    /// [`EventCache::subscribe_to_room_generic_updates`].
856    generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
857
858    /// A sender for a persisted linked chunk update.
859    ///
860    /// This is used to notify that some linked chunk has persisted some updates
861    /// to a store, during sync or a back-pagination of *any* linked chunk.
862    /// This can be used by observers to look for new events.
863    ///
864    /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
865    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
866
867    /// A background task listening to room and send queue updates, and
868    /// automatically subscribing the user to threads when needed, based on
869    /// the semantics of MSC4306.
870    ///
871    /// One important constraint is that there is only one such task per
872    /// [`EventCache`], so it does listen to *all* rooms at the same time.
873    _thread_subscriber_task: BackgroundTaskHandle,
874
875    /// A background task listening to room updates, and
876    /// automatically handling search index operations add/remove/edit
877    /// depending on the event type.
878    ///
879    /// One important constraint is that there is only one such task per
880    /// [`EventCache`], so it does listen to *all* rooms at the same time.
881    #[cfg(feature = "experimental-search")]
882    _search_indexing_task: BackgroundTaskHandle,
883
884    /// A test helper receiver that will be emitted every time the thread
885    /// subscriber task subscribed to a new thread.
886    ///
887    /// This is helpful for tests to coordinate that a new thread subscription
888    /// has been sent or not.
889    thread_subscriber_receiver: Receiver<()>,
890
891    #[cfg(feature = "e2e-encryption")]
892    redecryption_channels: redecryptor::RedecryptorChannels,
893}
894
895type AutoShrinkChannelPayload = OwnedRoomId;
896
897impl EventCacheInner {
898    fn client(&self) -> Result<Client> {
899        self.client.get().ok_or(EventCacheError::ClientDropped)
900    }
901
902    /// Clears all the room's data.
903    async fn clear_all_rooms(&self) -> Result<()> {
904        // Okay, here's where things get complicated.
905        //
906        // On the one hand, `by_room` may include storage for *some* rooms that we know
907        // about, but not *all* of them. Any room that hasn't been loaded in the
908        // client, or touched by a sync, will remain unloaded in memory, so it
909        // will be missing from `self.by_room`. As a result, we need to make
910        // sure that we're hitting the storage backend to *really* clear all the
911        // rooms, including those that haven't been loaded yet.
912        //
913        // On the other hand, one must NOT clear the `by_room` map, because if someone
914        // subscribed to a room update, they would never get any new update for
915        // that room, since re-creating the `RoomEventCache` would create a new,
916        // unrelated sender.
917        //
918        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
919        // store backend.
920        //
921        // As a result, for a short while, the in-memory linked chunks
922        // will be desynchronized from the storage. We need to be careful then. During
923        // that short while, we don't want *anyone* to touch the linked chunk
924        // (be it in memory or in the storage).
925        //
926        // And since that requirement applies to *any* room in `by_room` at the same
927        // time, we'll have to take the locks for *all* the live rooms, so as to
928        // properly clear the underlying storage.
929        //
930        // At this point, you might be scared about the potential for deadlocking. I am
931        // as well, but I'm convinced we're fine:
932        // 1. the lock for `by_room` is usually held only for a short while, and
933        //    independently of the other two kinds.
934        // 2. the state may acquire the store cross-process lock internally, but only
935        //    while the state's methods are called (so it's always transient). As a
936        //    result, as soon as we've acquired the state locks, the store lock ought to
937        //    be free.
938        // 3. The store lock is held explicitly only in a small scoped area below.
939        // 4. Then the store lock will be held internally when calling `reset()`, but at
940        //    this point it's only held for a short while each time, so rooms will take
941        //    turn to acquire it.
942
943        let rooms = self.by_room.write().await;
944
945        // Collect all the rooms' state locks, first: we can clear the storage only when
946        // nobody will touch it at the same time.
947        let room_locks = join_all(
948            rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
949        )
950        .await;
951
952        // Clear the storage for all the rooms, using the storage facility.
953        let store_guard = match self.store.lock().await? {
954            EventCacheStoreLockState::Clean(store_guard) => store_guard,
955            EventCacheStoreLockState::Dirty(store_guard) => store_guard,
956        };
957        store_guard.clear_all_linked_chunks().await?;
958
959        // At this point, all the in-memory linked chunks are desynchronized from the
960        // storage. Resynchronize them manually by calling reset(), and
961        // propagate updates to observers.
962        try_join_all(room_locks.into_iter().map(|(room, state_guard)| async move {
963            let mut state_guard = state_guard?;
964            let updates_as_vector_diffs = state_guard.reset().await?;
965
966            let _ = room.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
967                diffs: updates_as_vector_diffs,
968                origin: EventsOrigin::Cache,
969            });
970
971            let _ = room
972                .inner
973                .generic_update_sender
974                .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
975
976            Ok::<_, EventCacheError>(())
977        }))
978        .await?;
979
980        Ok(())
981    }
982
983    /// Handles a single set of room updates at once.
984    #[instrument(skip(self, updates))]
985    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
986        // First, take the lock that indicates we're processing updates, to avoid
987        // handling multiple updates concurrently.
988        let _lock = {
989            let _timer = timer!("Taking the `multiple_room_updates_lock`");
990            self.multiple_room_updates_lock.lock().await
991        };
992
993        // NOTE: bnjbvr tried to make this concurrent at some point, but it turned out
994        // to be a performance regression, even for large sync updates. Lacking
995        // time to investigate, this code remains sequential for now. See also
996        // https://github.com/matrix-org/matrix-rust-sdk/pull/5426.
997
998        // Left rooms.
999        for (room_id, left_room_update) in updates.left {
1000            let Ok(room) = self.for_room(&room_id).await else {
1001                error!(?room_id, "Room must exist");
1002                continue;
1003            };
1004
1005            if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
1006                // Non-fatal error, try to continue to the next room.
1007                error!("handling left room update: {err}");
1008            }
1009        }
1010
1011        // Joined rooms.
1012        for (room_id, joined_room_update) in updates.joined {
1013            trace!(?room_id, "Handling a `JoinedRoomUpdate`");
1014
1015            let Ok(room) = self.for_room(&room_id).await else {
1016                error!(?room_id, "Room must exist");
1017                continue;
1018            };
1019
1020            if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
1021                // Non-fatal error, try to continue to the next room.
1022                error!(%room_id, "handling joined room update: {err}");
1023            }
1024        }
1025
1026        // Invited rooms.
1027        // TODO: we don't anything with `updates.invite` at this point.
1028
1029        Ok(())
1030    }
1031
1032    /// Return a room-specific view over the [`EventCache`].
1033    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
1034        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
1035        // write lock.
1036        let by_room_guard = self.by_room.read().await;
1037
1038        match by_room_guard.get(room_id) {
1039            Some(room) => Ok(room.clone()),
1040
1041            None => {
1042                // Slow-path: the entry doesn't exist; let's acquire a write lock.
1043                drop(by_room_guard);
1044                let mut by_room_guard = self.by_room.write().await;
1045
1046                // In the meanwhile, some other caller might have obtained write access and done
1047                // the same, so check for existence again.
1048                if let Some(room) = by_room_guard.get(room_id) {
1049                    return Ok(room.clone());
1050                }
1051
1052                let pagination_status =
1053                    SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
1054
1055                let Some(client) = self.client.get() else {
1056                    return Err(EventCacheError::ClientDropped);
1057                };
1058
1059                let room = client
1060                    .get_room(room_id)
1061                    .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
1062                let room_version_rules = room.clone_info().room_version_rules_or_default();
1063
1064                let enabled_thread_support = matches!(
1065                    client.base_client().threading_support,
1066                    ThreadingSupport::Enabled { .. }
1067                );
1068
1069                let update_sender = Sender::new(32);
1070
1071                let room_state = RoomEventCacheStateLock::new(
1072                    room_id.to_owned(),
1073                    room_version_rules,
1074                    enabled_thread_support,
1075                    update_sender.clone(),
1076                    self.generic_update_sender.clone(),
1077                    self.linked_chunk_update_sender.clone(),
1078                    self.store.clone(),
1079                    pagination_status.clone(),
1080                )
1081                .await?;
1082
1083                let timeline_is_not_empty =
1084                    room_state.read().await?.room_linked_chunk().revents().next().is_some();
1085
1086                // SAFETY: we must have subscribed before reaching this code, otherwise
1087                // something is very wrong.
1088                let auto_shrink_sender =
1089                    self.auto_shrink_sender.get().cloned().expect(
1090                        "we must have called `EventCache::subscribe()` before calling here.",
1091                    );
1092
1093                let room_event_cache = RoomEventCache::new(
1094                    self.client.clone(),
1095                    room_state,
1096                    pagination_status,
1097                    room_id.to_owned(),
1098                    auto_shrink_sender,
1099                    update_sender,
1100                    self.generic_update_sender.clone(),
1101                );
1102
1103                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
1104
1105                // If at least one event has been loaded, it means there is a timeline. Let's
1106                // emit a generic update.
1107                if timeline_is_not_empty {
1108                    let _ = self
1109                        .generic_update_sender
1110                        .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
1111                }
1112
1113                Ok(room_event_cache)
1114            }
1115        }
1116    }
1117}
1118
1119/// The result of a single back-pagination request.
1120#[derive(Debug)]
1121pub struct BackPaginationOutcome {
1122    /// Did the back-pagination reach the start of the timeline?
1123    pub reached_start: bool,
1124
1125    /// All the events that have been returned in the back-pagination
1126    /// request.
1127    ///
1128    /// Events are presented in reverse order: the first element of the vec,
1129    /// if present, is the most "recent" event from the chunk (or
1130    /// technically, the last one in the topological ordering).
1131    pub events: Vec<TimelineEvent>,
1132}
1133
1134/// Represents a timeline update of a room. It hides the details of
1135/// [`RoomEventCacheUpdate`] by being more generic.
1136///
1137/// This is used by [`EventCache::subscribe_to_room_generic_updates`]. Please
1138/// read it to learn more about the motivation behind this type.
1139#[derive(Clone, Debug)]
1140pub struct RoomEventCacheGenericUpdate {
1141    /// The room ID owning the timeline.
1142    pub room_id: OwnedRoomId,
1143}
1144
1145/// An update being triggered when events change in the persisted event cache
1146/// for any room.
1147#[derive(Clone, Debug)]
1148struct RoomEventCacheLinkedChunkUpdate {
1149    /// The linked chunk affected by the update.
1150    linked_chunk_id: OwnedLinkedChunkId,
1151
1152    /// A vector of all the linked chunk updates that happened during this event
1153    /// cache update.
1154    updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
1155}
1156
1157impl RoomEventCacheLinkedChunkUpdate {
1158    /// Return all the new events propagated by this update, in topological
1159    /// order.
1160    pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
1161        use itertools::Either;
1162        self.updates.into_iter().flat_map(|update| match update {
1163            linked_chunk::Update::PushItems { items, .. } => {
1164                Either::Left(Either::Left(items.into_iter()))
1165            }
1166            linked_chunk::Update::ReplaceItem { item, .. } => {
1167                Either::Left(Either::Right(std::iter::once(item)))
1168            }
1169            linked_chunk::Update::RemoveItem { .. }
1170            | linked_chunk::Update::DetachLastItems { .. }
1171            | linked_chunk::Update::StartReattachItems
1172            | linked_chunk::Update::EndReattachItems
1173            | linked_chunk::Update::NewItemsChunk { .. }
1174            | linked_chunk::Update::NewGapChunk { .. }
1175            | linked_chunk::Update::RemoveChunk(..)
1176            | linked_chunk::Update::Clear => {
1177                // All these updates don't contain any new event.
1178                Either::Right(std::iter::empty())
1179            }
1180        })
1181    }
1182}
1183
1184/// An update related to events happened in a room.
1185#[derive(Debug, Clone)]
1186pub enum RoomEventCacheUpdate {
1187    /// The fully read marker has moved to a different event.
1188    MoveReadMarkerTo {
1189        /// Event at which the read marker is now pointing.
1190        event_id: OwnedEventId,
1191    },
1192
1193    /// The members have changed.
1194    UpdateMembers {
1195        /// Collection of ambiguity changes that room member events trigger.
1196        ///
1197        /// This is a map of event ID of the `m.room.member` event to the
1198        /// details of the ambiguity change.
1199        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
1200    },
1201
1202    /// The room has received updates for the timeline as _diffs_.
1203    UpdateTimelineEvents {
1204        /// Diffs to apply to the timeline.
1205        diffs: Vec<VectorDiff<TimelineEvent>>,
1206
1207        /// Where the diffs are coming from.
1208        origin: EventsOrigin,
1209    },
1210
1211    /// The room has received new ephemeral events.
1212    AddEphemeralEvents {
1213        /// XXX: this is temporary, until read receipts are handled in the event
1214        /// cache
1215        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1216    },
1217}
1218
1219/// Indicate where events are coming from.
1220#[derive(Debug, Clone)]
1221pub enum EventsOrigin {
1222    /// Events are coming from a sync.
1223    Sync,
1224
1225    /// Events are coming from pagination.
1226    Pagination,
1227
1228    /// The cause of the change is purely internal to the cache.
1229    Cache,
1230}
1231
1232#[cfg(test)]
1233mod tests {
1234    use std::{ops::Not, sync::Arc, time::Duration};
1235
1236    use assert_matches::assert_matches;
1237    use futures_util::FutureExt as _;
1238    use matrix_sdk_base::{
1239        RoomState,
1240        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1241        sync::{JoinedRoomUpdate, RoomUpdates, Timeline},
1242    };
1243    use matrix_sdk_test::{
1244        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
1245    };
1246    use ruma::{event_id, room_id, serde::Raw, user_id};
1247    use serde_json::json;
1248    use tokio::time::sleep;
1249
1250    use super::{EventCacheError, RoomEventCacheGenericUpdate, RoomEventCacheUpdate};
1251    use crate::test_utils::{
1252        assert_event_matches_msg, client::MockClientBuilder, logged_in_client,
1253    };
1254
1255    #[async_test]
1256    async fn test_must_explicitly_subscribe() {
1257        let client = logged_in_client(None).await;
1258
1259        let event_cache = client.event_cache();
1260
1261        // If I create a room event subscriber for a room before subscribing the event
1262        // cache,
1263        let room_id = room_id!("!omelette:fromage.fr");
1264        let result = event_cache.for_room(room_id).await;
1265
1266        // Then it fails, because one must explicitly call `.subscribe()` on the event
1267        // cache.
1268        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
1269    }
1270
1271    #[async_test]
1272    async fn test_uniq_read_marker() {
1273        let client = logged_in_client(None).await;
1274        let room_id = room_id!("!galette:saucisse.bzh");
1275        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1276
1277        let event_cache = client.event_cache();
1278
1279        event_cache.subscribe().unwrap();
1280
1281        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1282        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
1283        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1284
1285        assert!(events.is_empty());
1286
1287        // When sending multiple times the same read marker event,…
1288        let read_marker_event = Raw::from_json_string(
1289            json!({
1290                "content": {
1291                    "event_id": "$crepe:saucisse.bzh"
1292                },
1293                "room_id": "!galette:saucisse.bzh",
1294                "type": "m.fully_read"
1295            })
1296            .to_string(),
1297        )
1298        .unwrap();
1299        let account_data = vec![read_marker_event; 100];
1300
1301        room_event_cache
1302            .inner
1303            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
1304            .await
1305            .unwrap();
1306
1307        // … there's only one read marker update.
1308        assert_matches!(
1309            stream.recv().await.unwrap(),
1310            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
1311        );
1312
1313        assert!(stream.recv().now_or_never().is_none());
1314
1315        // None, because an account data doesn't trigger a generic update.
1316        assert!(generic_stream.recv().now_or_never().is_none());
1317    }
1318
1319    #[async_test]
1320    async fn test_get_event_by_id() {
1321        let client = logged_in_client(None).await;
1322        let room_id1 = room_id!("!galette:saucisse.bzh");
1323        let room_id2 = room_id!("!crepe:saucisse.bzh");
1324
1325        client.base_client().get_or_create_room(room_id1, RoomState::Joined);
1326        client.base_client().get_or_create_room(room_id2, RoomState::Joined);
1327
1328        let event_cache = client.event_cache();
1329        event_cache.subscribe().unwrap();
1330
1331        // Insert two rooms with a few events.
1332        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
1333
1334        let eid1 = event_id!("$1");
1335        let eid2 = event_id!("$2");
1336        let eid3 = event_id!("$3");
1337
1338        let joined_room_update1 = JoinedRoomUpdate {
1339            timeline: Timeline {
1340                events: vec![
1341                    f.text_msg("hey").event_id(eid1).into(),
1342                    f.text_msg("you").event_id(eid2).into(),
1343                ],
1344                ..Default::default()
1345            },
1346            ..Default::default()
1347        };
1348
1349        let joined_room_update2 = JoinedRoomUpdate {
1350            timeline: Timeline {
1351                events: vec![f.text_msg("bjr").event_id(eid3).into()],
1352                ..Default::default()
1353            },
1354            ..Default::default()
1355        };
1356
1357        let mut updates = RoomUpdates::default();
1358        updates.joined.insert(room_id1.to_owned(), joined_room_update1);
1359        updates.joined.insert(room_id2.to_owned(), joined_room_update2);
1360
1361        // Have the event cache handle them.
1362        event_cache.inner.handle_room_updates(updates).await.unwrap();
1363
1364        // We can find the events in a single room.
1365        let room1 = client.get_room(room_id1).unwrap();
1366
1367        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
1368
1369        let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap();
1370        assert_event_matches_msg(&found1, "hey");
1371
1372        let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap();
1373        assert_event_matches_msg(&found2, "you");
1374
1375        // Retrieving the event with id3 from the room which doesn't contain it will
1376        // fail…
1377        assert!(room_event_cache.find_event(eid3).await.unwrap().is_none());
1378    }
1379
1380    #[async_test]
1381    async fn test_save_event() {
1382        let client = logged_in_client(None).await;
1383        let room_id = room_id!("!galette:saucisse.bzh");
1384
1385        let event_cache = client.event_cache();
1386        event_cache.subscribe().unwrap();
1387
1388        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1389        let event_id = event_id!("$1");
1390
1391        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1392        let room = client.get_room(room_id).unwrap();
1393
1394        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1395        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
1396
1397        // Retrieving the event at the room-wide cache works.
1398        assert!(room_event_cache.find_event(event_id).await.unwrap().is_some());
1399    }
1400
1401    #[async_test]
1402    async fn test_generic_update_when_loading_rooms() {
1403        // Create 2 rooms. One of them has data in the event cache storage.
1404        let user = user_id!("@mnt_io:matrix.org");
1405        let client = logged_in_client(None).await;
1406        let room_id_0 = room_id!("!raclette:patate.ch");
1407        let room_id_1 = room_id!("!fondue:patate.ch");
1408
1409        let event_factory = EventFactory::new().room(room_id_0).sender(user);
1410
1411        let event_cache = client.event_cache();
1412        event_cache.subscribe().unwrap();
1413
1414        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
1415        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
1416
1417        client
1418            .event_cache_store()
1419            .lock()
1420            .await
1421            .expect("Could not acquire the event cache lock")
1422            .as_clean()
1423            .expect("Could not acquire a clean event cache lock")
1424            .handle_linked_chunk_updates(
1425                LinkedChunkId::Room(room_id_0),
1426                vec![
1427                    // Non-empty items chunk.
1428                    Update::NewItemsChunk {
1429                        previous: None,
1430                        new: ChunkIdentifier::new(0),
1431                        next: None,
1432                    },
1433                    Update::PushItems {
1434                        at: Position::new(ChunkIdentifier::new(0), 0),
1435                        items: vec![
1436                            event_factory
1437                                .text_msg("hello")
1438                                .sender(user)
1439                                .event_id(event_id!("$ev0"))
1440                                .into_event(),
1441                        ],
1442                    },
1443                ],
1444            )
1445            .await
1446            .unwrap();
1447
1448        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1449
1450        // Room 0 has initial data, so it must trigger a generic update.
1451        {
1452            let _room_event_cache = event_cache.for_room(room_id_0).await.unwrap();
1453
1454            assert_matches!(
1455                generic_stream.recv().await,
1456                Ok(RoomEventCacheGenericUpdate { room_id }) => {
1457                    assert_eq!(room_id, room_id_0);
1458                }
1459            );
1460        }
1461
1462        // Room 1 has NO initial data, so nothing should happen.
1463        {
1464            let _room_event_cache = event_cache.for_room(room_id_1).await.unwrap();
1465
1466            assert!(generic_stream.recv().now_or_never().is_none());
1467        }
1468    }
1469
1470    #[async_test]
1471    async fn test_generic_update_when_paginating_room() {
1472        // Create 1 room, with 4 chunks in the event cache storage.
1473        let user = user_id!("@mnt_io:matrix.org");
1474        let client = logged_in_client(None).await;
1475        let room_id = room_id!("!raclette:patate.ch");
1476
1477        let event_factory = EventFactory::new().room(room_id).sender(user);
1478
1479        let event_cache = client.event_cache();
1480        event_cache.subscribe().unwrap();
1481
1482        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1483
1484        client
1485            .event_cache_store()
1486            .lock()
1487            .await
1488            .expect("Could not acquire the event cache lock")
1489            .as_clean()
1490            .expect("Could not acquire a clean event cache lock")
1491            .handle_linked_chunk_updates(
1492                LinkedChunkId::Room(room_id),
1493                vec![
1494                    // Empty chunk.
1495                    Update::NewItemsChunk {
1496                        previous: None,
1497                        new: ChunkIdentifier::new(0),
1498                        next: None,
1499                    },
1500                    // Empty chunk.
1501                    Update::NewItemsChunk {
1502                        previous: Some(ChunkIdentifier::new(0)),
1503                        new: ChunkIdentifier::new(1),
1504                        next: None,
1505                    },
1506                    // Non-empty items chunk.
1507                    Update::NewItemsChunk {
1508                        previous: Some(ChunkIdentifier::new(1)),
1509                        new: ChunkIdentifier::new(2),
1510                        next: None,
1511                    },
1512                    Update::PushItems {
1513                        at: Position::new(ChunkIdentifier::new(2), 0),
1514                        items: vec![
1515                            event_factory
1516                                .text_msg("hello")
1517                                .sender(user)
1518                                .event_id(event_id!("$ev0"))
1519                                .into_event(),
1520                        ],
1521                    },
1522                    // Non-empty items chunk.
1523                    Update::NewItemsChunk {
1524                        previous: Some(ChunkIdentifier::new(2)),
1525                        new: ChunkIdentifier::new(3),
1526                        next: None,
1527                    },
1528                    Update::PushItems {
1529                        at: Position::new(ChunkIdentifier::new(3), 0),
1530                        items: vec![
1531                            event_factory
1532                                .text_msg("world")
1533                                .sender(user)
1534                                .event_id(event_id!("$ev1"))
1535                                .into_event(),
1536                        ],
1537                    },
1538                ],
1539            )
1540            .await
1541            .unwrap();
1542
1543        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1544
1545        // Room is initialised, it gets one event in the timeline.
1546        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1547
1548        assert_matches!(
1549            generic_stream.recv().await,
1550            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1551                assert_eq!(room_id, expected_room_id);
1552            }
1553        );
1554
1555        let pagination = room_event_cache.pagination();
1556
1557        // Paginate, it gets one new event in the timeline.
1558        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1559
1560        assert_eq!(pagination_outcome.events.len(), 1);
1561        assert!(pagination_outcome.reached_start.not());
1562        assert_matches!(
1563            generic_stream.recv().await,
1564            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1565                assert_eq!(room_id, expected_room_id);
1566            }
1567        );
1568
1569        // Paginate, it gets zero new event in the timeline.
1570        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1571
1572        assert!(pagination_outcome.events.is_empty());
1573        assert!(pagination_outcome.reached_start.not());
1574        assert!(generic_stream.recv().now_or_never().is_none());
1575
1576        // Paginate once more. Just checking our scenario is correct.
1577        let pagination_outcome = pagination.run_backwards_once(1).await.unwrap();
1578
1579        assert!(pagination_outcome.reached_start);
1580        assert!(generic_stream.recv().now_or_never().is_none());
1581    }
1582
1583    #[async_test]
1584    async fn test_for_room_when_room_is_not_found() {
1585        let client = logged_in_client(None).await;
1586        let room_id = room_id!("!raclette:patate.ch");
1587
1588        let event_cache = client.event_cache();
1589        event_cache.subscribe().unwrap();
1590
1591        // Room doesn't exist. It returns an error.
1592        assert_matches!(
1593            event_cache.for_room(room_id).await,
1594            Err(EventCacheError::RoomNotFound { room_id: not_found_room_id }) => {
1595                assert_eq!(room_id, not_found_room_id);
1596            }
1597        );
1598
1599        // Now create the room.
1600        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1601
1602        // Room exists. Everything fine.
1603        assert!(event_cache.for_room(room_id).await.is_ok());
1604    }
1605
1606    /// Test that the event cache does not create reference cycles or tasks that
1607    /// retain its reference indefinitely, preventing it from being deallocated.
1608    #[cfg(not(target_family = "wasm"))]
1609    #[async_test]
1610    async fn test_no_refcycle_event_cache_tasks() {
1611        let client = MockClientBuilder::new(None).build().await;
1612
1613        // Wait for the init tasks to die.
1614        sleep(Duration::from_secs(1)).await;
1615
1616        let event_cache_weak = Arc::downgrade(&client.event_cache().inner);
1617        assert_eq!(event_cache_weak.strong_count(), 1);
1618
1619        {
1620            let room_id = room_id!("!room:example.org");
1621
1622            // Have the client know the room.
1623            let response = SyncResponseBuilder::default()
1624                .add_joined_room(JoinedRoomBuilder::new(room_id))
1625                .build_sync_response();
1626            client.inner.base_client.receive_sync_response(response).await.unwrap();
1627
1628            client.event_cache().subscribe().unwrap();
1629
1630            let (_room_event_cache, _drop_handles) =
1631                client.get_room(room_id).unwrap().event_cache().await.unwrap();
1632        }
1633
1634        drop(client);
1635
1636        // Give a bit of time for background tasks to die.
1637        sleep(Duration::from_secs(1)).await;
1638
1639        // No strong counts should exist now that the Client has been dropped.
1640        assert_eq!(
1641            event_cache_weak.strong_count(),
1642            0,
1643            "Too many strong references to the event cache {}",
1644            event_cache_weak.strong_count()
1645        );
1646    }
1647}