Skip to main content

matrix_sdk/event_cache/caches/room/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21    collections::BTreeMap,
22    fmt,
23    sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28    deserialized_responses::AmbiguityChange,
29    event_cache::Event,
30    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
31};
32use ruma::{
33    EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomId,
34    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
35    serde::Raw,
36};
37pub(super) use state::{LockedRoomEventCacheState, RoomEventCacheStateLockWriteGuard};
38pub use subscriber::RoomEventCacheSubscriber;
39use tokio::sync::{Notify, broadcast::Receiver, mpsc};
40use tracing::{instrument, trace, warn};
41pub use updates::{
42    RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
43    RoomEventCacheUpdateSender,
44};
45
46use super::{
47    super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination},
48    TimelineVectorDiffs,
49    event_linked_chunk::sort_positions_descending,
50    thread::pagination::ThreadPagination,
51};
52use crate::{
53    client::WeakClient,
54    event_cache::{
55        EventFocusThreadMode,
56        caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus},
57    },
58    room::WeakRoom,
59};
60
61/// A subset of an event cache, for a room.
62///
63/// Cloning is shallow, and thus is cheap to do.
64#[derive(Clone)]
65pub struct RoomEventCache {
66    inner: Arc<RoomEventCacheInner>,
67}
68
69impl fmt::Debug for RoomEventCache {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("RoomEventCache").finish_non_exhaustive()
72    }
73}
74
75impl RoomEventCache {
76    /// Create a new [`RoomEventCache`] using the given room and store.
77    pub(super) fn new(
78        room_id: OwnedRoomId,
79        weak_room: WeakRoom,
80        state: LockedRoomEventCacheState,
81        shared_pagination_status: SharedObservable<SharedPaginationStatus>,
82        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
83        update_sender: RoomEventCacheUpdateSender,
84    ) -> Self {
85        Self {
86            inner: Arc::new(RoomEventCacheInner::new(
87                room_id,
88                weak_room,
89                state,
90                shared_pagination_status,
91                auto_shrink_sender,
92                update_sender,
93            )),
94        }
95    }
96
97    /// Get the room ID for this [`RoomEventCache`].
98    pub fn room_id(&self) -> &RoomId {
99        &self.inner.room_id
100    }
101
102    /// Read all current events.
103    ///
104    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
105    /// subscriber.
106    pub async fn events(&self) -> Result<Vec<Event>> {
107        let state = self.inner.state.read().await?;
108
109        Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
110    }
111
112    /// Subscribe to this room updates, after getting the initial list of
113    /// events.
114    ///
115    /// Use [`RoomEventCache::events`] to get all current events without the
116    /// subscriber. Creating, and especially dropping, a
117    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
118    pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
119        let state = self.inner.state.read().await?;
120        let events =
121            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
122
123        let subscriber_count = state.subscriber_count();
124        let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
125        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
126
127        let subscriber = RoomEventCacheSubscriber::new(
128            self.inner.update_sender.new_room_receiver(),
129            self.inner.room_id.clone(),
130            self.inner.auto_shrink_sender.clone(),
131            subscriber_count.clone(),
132        );
133
134        Ok((events, subscriber))
135    }
136
137    /// Subscribe to thread for a given root event, and get a (maybe empty)
138    /// initially known list of events for that thread.
139    pub async fn subscribe_to_thread(
140        &self,
141        thread_root: OwnedEventId,
142    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
143        let mut state = self.inner.state.write().await?;
144
145        state.subscribe_to_thread(thread_root).await
146    }
147
148    /// Subscribe to the pinned event cache for this room.
149    ///
150    /// This is a persisted view over the pinned events of a room.
151    ///
152    /// The pinned events will be initially reloaded from storage, and/or loaded
153    /// from a network request to fetch the latest pinned events and their
154    /// relations, to update it as needed. The list of pinned events will
155    /// also be kept up-to-date as new events are pinned, and new
156    /// related events show up from other sources.
157    pub async fn subscribe_to_pinned_events(
158        &self,
159    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
160        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
161        let state = self.inner.state.read().await?;
162
163        state.subscribe_to_pinned_events(room).await
164    }
165
166    /// Create or get an event-focused timeline cache for this room.
167    ///
168    /// This creates a timeline centered around a specific event (e.g., for
169    /// permalinks), in a given mode, supporting both forward and backward
170    /// pagination.
171    ///
172    /// If the focused event is part of a thread, the timeline will
173    /// automatically use thread-specific pagination.
174    ///
175    /// If the thread mode is defined to [`EventFocusThreadMode::ForceThread`],
176    /// the timeline will be focused on the thread root of the thread the
177    /// target event belongs to, or it will consider that the target event
178    /// itself is the thread root.
179    #[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))]
180    pub async fn get_or_create_event_focused_cache(
181        &self,
182        event_id: OwnedEventId,
183        num_context_events: u16,
184        thread_mode: EventFocusThreadMode,
185    ) -> Result<EventFocusedCache> {
186        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
187        let guard = self.inner.state.read().await?;
188
189        // Check if we already have a cache for this event.
190        if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
191            trace!("the cache was already created, returning it");
192            return Ok(cache);
193        }
194
195        // Create a new cache.
196        let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone();
197
198        // Make sure to drop the guard before calling `start_from` below, as it may need
199        // to lock the room event cache's state again, when memoizing events
200        // received from the network response.
201        drop(guard);
202
203        let room_id = room.room_id().to_owned();
204        let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone());
205
206        trace!("creating a fresh event-focused cache");
207        let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender);
208
209        // Initialize the cache from the server.
210        cache.start_from(room, num_context_events, thread_mode).await?;
211
212        let mut guard = self.inner.state.write().await?;
213
214        // Check again if we already have a cache for this event, just in case there was
215        // a race with another caller during initialization.
216        if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
217            trace!("another cache has been racily created, returning it");
218            return Ok(cache);
219        }
220
221        // Insert the cache in the map.
222        guard.insert_event_focused_cache(event_id, thread_mode, cache.clone());
223
224        Ok(cache)
225    }
226
227    /// Get an event-focused cache for this event and thread mode, if it exists.
228    ///
229    /// Otherwise, returns `None`.
230    ///
231    /// Use [`Self::get_or_create_event_focused_cache`] for ensuring such a
232    /// cache exists.
233    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234    pub async fn get_event_focused_cache(
235        &self,
236        event_id: OwnedEventId,
237        thread_mode: EventFocusThreadMode,
238    ) -> Result<Option<EventFocusedCache>> {
239        Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode))
240    }
241
242    /// Return a [`RoomPagination`] type useful for running back-pagination
243    /// queries in the current room.
244    pub fn pagination(&self) -> RoomPagination {
245        RoomPagination::new(self.inner.clone())
246    }
247
248    /// Return a [`ThreadPagination`] type useful for running back-pagination
249    /// queries in the `thread_id` thread.
250    pub async fn thread_pagination(&self, thread_id: OwnedEventId) -> Result<ThreadPagination> {
251        Ok(self.inner.state.write().await?.get_or_reload_thread(thread_id).pagination())
252    }
253
254    /// Try to find a single event in this room, starting from the most recent
255    /// event.
256    ///
257    /// The `predicate` receives the current event as its single argument.
258    ///
259    /// **Warning**! It looks into the loaded events from the in-memory linked
260    /// chunk **only**. It doesn't look inside the storage.
261    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
262    where
263        P: FnMut(&Event) -> Option<O>,
264    {
265        Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
266    }
267
268    /// Try to find an event by ID in this room.
269    ///
270    /// It starts by looking into loaded events before looking inside the
271    /// storage.
272    pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
273        Ok(self
274            .inner
275            .state
276            .read()
277            .await?
278            .find_event(event_id)
279            .await
280            .ok()
281            .flatten()
282            .map(|(_loc, event)| event))
283    }
284
285    /// Try to find an event by ID in this room, along with its related events.
286    ///
287    /// You can filter which types of related events to retrieve using
288    /// `filter`. `None` will retrieve related events of any type.
289    ///
290    /// The related events are sorted like this:
291    ///
292    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
293    ///   located at the beginning of the array.
294    /// - events present in the linked chunk (be it in memory or in the storage)
295    ///   will be sorted according to their ordering in the linked chunk.
296    pub async fn find_event_with_relations(
297        &self,
298        event_id: &EventId,
299        filter: Option<Vec<RelationType>>,
300    ) -> Result<Option<(Event, Vec<Event>)>> {
301        // Search in all loaded or stored events.
302        Ok(self
303            .inner
304            .state
305            .read()
306            .await?
307            .find_event_with_relations(event_id, filter.clone())
308            .await
309            .ok()
310            .flatten())
311    }
312
313    /// Try to find the related events for an event by ID in this room.
314    ///
315    /// You can filter which types of related events to retrieve using
316    /// `filter`. `None` will retrieve related events of any type.
317    ///
318    /// The related events are sorted like this:
319    ///
320    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
321    ///   located at the beginning of the array.
322    /// - events present in the linked chunk (be it in memory or in the storage)
323    ///   will be sorted according to their ordering in the linked chunk.
324    pub async fn find_event_relations(
325        &self,
326        event_id: &EventId,
327        filter: Option<Vec<RelationType>>,
328    ) -> Result<Vec<Event>> {
329        // Search in all loaded or stored events.
330        self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
331    }
332
333    /// Clear all the storage for this [`RoomEventCache`].
334    ///
335    /// This will get rid of all the events from the linked chunk and persisted
336    /// storage.
337    pub async fn clear(&self) -> Result<()> {
338        // Clear the linked chunk and persisted storage.
339        let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
340
341        // Notify observers about the update.
342        self.inner.update_sender.send(
343            RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
344                diffs: updates_as_vector_diffs,
345                origin: EventsOrigin::Cache,
346            }),
347            Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
348        );
349
350        Ok(())
351    }
352
353    /// Return a reference to the state.
354    pub(in super::super) fn state(&self) -> &LockedRoomEventCacheState {
355        &self.inner.state
356    }
357
358    /// Handle a [`JoinedRoomUpdate`].
359    #[instrument(skip_all, fields(room_id = %self.room_id()))]
360    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
361        self.inner
362            .handle_timeline(
363                updates.timeline,
364                updates.ephemeral.clone(),
365                updates.ambiguity_changes,
366                updates.avatar_changes,
367            )
368            .await?;
369        self.inner.handle_account_data(updates.account_data);
370
371        Ok(())
372    }
373
374    /// Handle a [`LeftRoomUpdate`].
375    #[instrument(skip_all, fields(room_id = %self.room_id()))]
376    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
377        self.inner
378            .handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes, None)
379            .await?;
380
381        Ok(())
382    }
383
384    /// Get a reference to the [`RoomEventCacheUpdateSender`].
385    pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
386        &self.inner.update_sender
387    }
388
389    /// Handle a single event from the `SendQueue`.
390    pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
391        self.inner.insert_sent_event_from_send_queue(event).await
392    }
393
394    /// Save some events in the event cache, for further retrieval with
395    /// [`Self::event`].
396    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
397        match self.inner.state.write().await {
398            Ok(mut state_guard) => {
399                if let Err(err) = state_guard.save_events(events).await {
400                    warn!("couldn't save event in the event cache: {err}");
401                }
402            }
403
404            Err(err) => {
405                warn!("couldn't save event in the event cache: {err}");
406            }
407        }
408    }
409
410    /// Return a nice debug string (a vector of lines) for the linked chunk of
411    /// events for this room.
412    pub async fn debug_string(&self) -> Vec<String> {
413        match self.inner.state.read().await {
414            Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
415            Err(err) => {
416                warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
417
418                vec![]
419            }
420        }
421    }
422}
423
424/// The (non-cloneable) details of the `RoomEventCache`.
425pub(super) struct RoomEventCacheInner {
426    /// The room id for this room.
427    room_id: OwnedRoomId,
428
429    pub weak_room: WeakRoom,
430
431    /// State for this room's event cache.
432    pub state: LockedRoomEventCacheState,
433
434    /// A notifier that we received a new pagination token.
435    pub pagination_batch_token_notifier: Notify,
436
437    pub shared_pagination_status: SharedObservable<SharedPaginationStatus>,
438
439    /// Sender to the auto-shrink channel.
440    ///
441    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
442    /// more details.
443    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
444
445    /// Update sender for this room.
446    update_sender: RoomEventCacheUpdateSender,
447}
448
449impl RoomEventCacheInner {
450    /// Creates a new cache for a room, and subscribes to room updates, so as
451    /// to handle new timeline events.
452    fn new(
453        room_id: OwnedRoomId,
454        weak_room: WeakRoom,
455        state: LockedRoomEventCacheState,
456        shared_pagination_status: SharedObservable<SharedPaginationStatus>,
457        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
458        update_sender: RoomEventCacheUpdateSender,
459    ) -> Self {
460        Self {
461            room_id,
462            weak_room,
463            state,
464            update_sender,
465            pagination_batch_token_notifier: Default::default(),
466            auto_shrink_sender,
467            shared_pagination_status,
468        }
469    }
470
471    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
472        if account_data.is_empty() {
473            return;
474        }
475
476        let mut handled_read_marker = false;
477
478        trace!("Handling account data");
479
480        for raw_event in account_data {
481            match raw_event.deserialize() {
482                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
483                    // If duplicated, do not forward read marker multiple times
484                    // to avoid clutter the update channel.
485                    if handled_read_marker {
486                        continue;
487                    }
488
489                    handled_read_marker = true;
490
491                    // Propagate to observers. (We ignore the error if there aren't any.)
492                    self.update_sender.send(
493                        RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
494                        None,
495                    );
496                }
497
498                Ok(_) => {
499                    // We're not interested in other room account data updates,
500                    // at this point.
501                }
502
503                Err(e) => {
504                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
505                    warn!(event_type, "Failed to deserialize account data: {e}");
506                }
507            }
508        }
509    }
510
511    /// Handle a [`Timeline`], i.e. new events received by a sync for this
512    /// room.
513    async fn handle_timeline(
514        &self,
515        timeline: Timeline,
516        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
517        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
518        avatar_changes: Option<BTreeMap<OwnedUserId, Option<OwnedMxcUri>>>,
519    ) -> Result<()> {
520        self.handle_timeline_inner(
521            self.state.write().await?,
522            timeline,
523            ephemeral_events,
524            ambiguity_changes,
525            avatar_changes,
526        )
527        .await
528    }
529
530    /// Handle a single event from the `SendQueue`.
531    ///
532    /// The event is inserted if and only if the cache is not empty.
533    async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
534        let state = self.state.write().await?;
535
536        // Insert the event if the room is not empty, otherwise it can break the
537        // pagination logic when detecting the start of the timeline because no gap can
538        // be inserted properly: it is impossible to compute a `prev_batch` token here.
539        if state.room_linked_chunk().events().next().is_some() {
540            return self
541                .handle_timeline_inner(
542                    state,
543                    Timeline { limited: false, prev_batch: None, events: vec![event] },
544                    Vec::new(),
545                    BTreeMap::new(),
546                    None,
547                )
548                .await;
549        }
550
551        Ok(())
552    }
553
554    async fn handle_timeline_inner(
555        &self,
556        mut state: RoomEventCacheStateLockWriteGuard<'_>,
557        timeline: Timeline,
558        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
559        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
560        avatar_changes: Option<BTreeMap<OwnedUserId, Option<OwnedMxcUri>>>,
561    ) -> Result<()> {
562        if timeline.events.is_empty()
563            && timeline.prev_batch.is_none()
564            && ephemeral_events.is_empty()
565            && ambiguity_changes.is_empty()
566            && avatar_changes.as_ref().is_none_or(|avatars| avatars.is_empty())
567        {
568            return Ok(());
569        }
570
571        // Add all the events to the backend.
572        trace!("adding new events");
573
574        let (stored_prev_batch_token, timeline_event_diffs) =
575            state.handle_sync(timeline, &ephemeral_events).await?;
576
577        drop(state);
578
579        // Now that all events have been added, we can trigger the
580        // `pagination_token_notifier`.
581        if stored_prev_batch_token {
582            self.pagination_batch_token_notifier.notify_one();
583        }
584
585        // The order matters here: first send the timeline event diffs, then only the
586        // related events (read receipts, etc.).
587        if !timeline_event_diffs.is_empty() {
588            self.update_sender.send(
589                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
590                    diffs: timeline_event_diffs,
591                    origin: EventsOrigin::Sync,
592                }),
593                Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
594            );
595        }
596
597        if !ephemeral_events.is_empty() {
598            self.update_sender
599                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
600        }
601
602        if !ambiguity_changes.is_empty() || avatar_changes.as_ref().is_some_and(|c| !c.is_empty()) {
603            self.update_sender.send(
604                RoomEventCacheUpdate::UpdateMembers { ambiguity_changes, avatar_changes },
605                None,
606            );
607        }
608
609        Ok(())
610    }
611}
612
613#[derive(Clone, Copy)]
614pub(in super::super) enum PostProcessingOrigin {
615    Sync,
616    Backpagination,
617    #[cfg(feature = "e2e-encryption")]
618    Redecryption,
619}
620
621#[cfg(test)]
622mod tests {
623    use matrix_sdk_base::{RoomState, event_cache::Event};
624    use matrix_sdk_test::{async_test, event_factory::EventFactory};
625    use ruma::{
626        RoomId, event_id,
627        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
628        room_id, user_id,
629    };
630
631    use crate::test_utils::logged_in_client;
632
633    #[async_test]
634    async fn test_find_event_by_id_with_edit_relation() {
635        let original_id = event_id!("$original");
636        let related_id = event_id!("$related");
637        let room_id = room_id!("!galette:saucisse.bzh");
638        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
639
640        assert_relations(
641            room_id,
642            f.text_msg("Original event").event_id(original_id).into(),
643            f.text_msg("* An edited event")
644                .edit(
645                    original_id,
646                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
647                )
648                .event_id(related_id)
649                .into(),
650            f,
651        )
652        .await;
653    }
654
655    #[async_test]
656    async fn test_find_event_by_id_with_thread_reply_relation() {
657        let original_id = event_id!("$original");
658        let related_id = event_id!("$related");
659        let room_id = room_id!("!galette:saucisse.bzh");
660        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
661
662        assert_relations(
663            room_id,
664            f.text_msg("Original event").event_id(original_id).into(),
665            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
666            f,
667        )
668        .await;
669    }
670
671    #[async_test]
672    async fn test_find_event_by_id_with_reaction_relation() {
673        let original_id = event_id!("$original");
674        let related_id = event_id!("$related");
675        let room_id = room_id!("!galette:saucisse.bzh");
676        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
677
678        assert_relations(
679            room_id,
680            f.text_msg("Original event").event_id(original_id).into(),
681            f.reaction(original_id, ":D").event_id(related_id).into(),
682            f,
683        )
684        .await;
685    }
686
687    #[async_test]
688    async fn test_find_event_by_id_with_poll_response_relation() {
689        let original_id = event_id!("$original");
690        let related_id = event_id!("$related");
691        let room_id = room_id!("!galette:saucisse.bzh");
692        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
693
694        assert_relations(
695            room_id,
696            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
697                .event_id(original_id)
698                .into(),
699            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
700            f,
701        )
702        .await;
703    }
704
705    #[async_test]
706    async fn test_find_event_by_id_with_poll_end_relation() {
707        let original_id = event_id!("$original");
708        let related_id = event_id!("$related");
709        let room_id = room_id!("!galette:saucisse.bzh");
710        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
711
712        assert_relations(
713            room_id,
714            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
715                .event_id(original_id)
716                .into(),
717            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
718            f,
719        )
720        .await;
721    }
722
723    #[async_test]
724    async fn test_find_event_by_id_with_filtered_relationships() {
725        let original_id = event_id!("$original");
726        let related_id = event_id!("$related");
727        let associated_related_id = event_id!("$recursive_related");
728        let room_id = room_id!("!galette:saucisse.bzh");
729        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
730
731        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
732        let related_event = event_factory
733            .text_msg("* Edited event")
734            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
735            .event_id(related_id)
736            .into();
737        let associated_related_event =
738            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
739
740        let client = logged_in_client(None).await;
741
742        let event_cache = client.event_cache();
743        event_cache.subscribe().unwrap();
744
745        client.base_client().get_or_create_room(room_id, RoomState::Joined);
746        let room = client.get_room(room_id).unwrap();
747
748        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
749
750        // Save the original event.
751        room_event_cache.save_events([original_event]).await;
752
753        // Save the related event.
754        room_event_cache.save_events([related_event]).await;
755
756        // Save the associated related event, which redacts the related event.
757        room_event_cache.save_events([associated_related_event]).await;
758
759        let filter = Some(vec![RelationType::Replacement]);
760        let (event, related_events) = room_event_cache
761            .find_event_with_relations(original_id, filter)
762            .await
763            .expect("Failed to find the event with relations")
764            .expect("Event has no relation");
765        // Fetched event is the right one.
766        let cached_event_id = event.event_id().unwrap();
767        assert_eq!(cached_event_id, original_id);
768
769        // There's only the edit event (an edit event can't have its own edit event).
770        assert_eq!(related_events.len(), 1);
771
772        let related_event_id = related_events[0].event_id().unwrap();
773        assert_eq!(related_event_id, related_id);
774
775        // Now we'll filter threads instead, there should be no related events
776        let filter = Some(vec![RelationType::Thread]);
777        let (event, related_events) = room_event_cache
778            .find_event_with_relations(original_id, filter)
779            .await
780            .expect("Failed to find the event with relations")
781            .expect("Event has no relation");
782
783        // Fetched event is the right one.
784        let cached_event_id = event.event_id().unwrap();
785        assert_eq!(cached_event_id, original_id);
786        // No Thread related events found
787        assert!(related_events.is_empty());
788    }
789
790    #[async_test]
791    async fn test_find_event_by_id_with_recursive_relation() {
792        let original_id = event_id!("$original");
793        let related_id = event_id!("$related");
794        let associated_related_id = event_id!("$recursive_related");
795        let room_id = room_id!("!galette:saucisse.bzh");
796        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
797
798        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
799        let related_event = event_factory
800            .text_msg("* Edited event")
801            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
802            .event_id(related_id)
803            .into();
804        let associated_related_event =
805            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
806
807        let client = logged_in_client(None).await;
808
809        let event_cache = client.event_cache();
810        event_cache.subscribe().unwrap();
811
812        client.base_client().get_or_create_room(room_id, RoomState::Joined);
813        let room = client.get_room(room_id).unwrap();
814
815        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
816
817        // Save the original event.
818        room_event_cache.save_events([original_event]).await;
819
820        // Save the related event.
821        room_event_cache.save_events([related_event]).await;
822
823        // Save the associated related event, which redacts the related event.
824        room_event_cache.save_events([associated_related_event]).await;
825
826        let (event, related_events) = room_event_cache
827            .find_event_with_relations(original_id, None)
828            .await
829            .expect("Failed to find the event with relations")
830            .expect("Event has no relation");
831        // Fetched event is the right one.
832        let cached_event_id = event.event_id().unwrap();
833        assert_eq!(cached_event_id, original_id);
834
835        // There are both the related id and the associatively related id
836        assert_eq!(related_events.len(), 2);
837
838        let related_event_id = related_events[0].event_id().unwrap();
839        assert_eq!(related_event_id, related_id);
840        let related_event_id = related_events[1].event_id().unwrap();
841        assert_eq!(related_event_id, associated_related_id);
842    }
843
844    async fn assert_relations(
845        room_id: &RoomId,
846        original_event: Event,
847        related_event: Event,
848        event_factory: EventFactory,
849    ) {
850        let client = logged_in_client(None).await;
851
852        let event_cache = client.event_cache();
853        event_cache.subscribe().unwrap();
854
855        client.base_client().get_or_create_room(room_id, RoomState::Joined);
856        let room = client.get_room(room_id).unwrap();
857
858        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
859
860        // Save the original event.
861        let original_event_id = original_event.event_id().unwrap();
862        room_event_cache.save_events([original_event]).await;
863
864        // Save an unrelated event to check it's not in the related events list.
865        let unrelated_id = event_id!("$2");
866        room_event_cache
867            .save_events([event_factory
868                .text_msg("An unrelated event")
869                .event_id(unrelated_id)
870                .into()])
871            .await;
872
873        // Save the related event.
874        let related_id = related_event.event_id().unwrap();
875        room_event_cache.save_events([related_event]).await;
876
877        let (event, related_events) = room_event_cache
878            .find_event_with_relations(&original_event_id, None)
879            .await
880            .expect("Failed to find the event with relations")
881            .expect("Event has no relation");
882        // Fetched event is the right one.
883        let cached_event_id = event.event_id().unwrap();
884        assert_eq!(cached_event_id, original_event_id);
885
886        // There is only the actually related event in the related ones
887        let related_event_id = related_events[0].event_id().unwrap();
888        assert_eq!(related_event_id, related_id);
889    }
890}
891
892#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
893mod timed_tests {
894    use std::{ops::Not, sync::Arc};
895
896    use assert_matches::assert_matches;
897    use assert_matches2::assert_let;
898    use eyeball_im::VectorDiff;
899    use futures_util::FutureExt;
900    use matrix_sdk_base::{
901        RoomState,
902        event_cache::{
903            Gap,
904            store::{EventCacheStore as _, MemoryStore},
905        },
906        linked_chunk::{
907            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
908            lazy_loader::from_all_chunks,
909        },
910        store::StoreConfig,
911        sync::{JoinedRoomUpdate, Timeline},
912    };
913    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
914    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
915    use ruma::{
916        EventId, event_id,
917        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
918        room_id,
919        serde::Raw,
920        user_id,
921    };
922    use serde_json::json;
923    use tokio::task::yield_now;
924
925    use super::{
926        super::{
927            super::TimelineVectorDiffs, lock::Reload as _,
928            pagination::LoadMoreEventsBackwardsOutcome,
929        },
930        RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
931    };
932    use crate::{assert_let_timeout, test_utils::client::MockClientBuilder};
933
934    #[async_test]
935    async fn test_write_to_storage() {
936        let room_id = room_id!("!galette:saucisse.bzh");
937        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
938
939        let event_cache_store = Arc::new(MemoryStore::new());
940
941        let client = MockClientBuilder::new(None)
942            .on_builder(|builder| {
943                builder.store_config(
944                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
945                        .event_cache_store(event_cache_store.clone()),
946                )
947            })
948            .build()
949            .await;
950
951        let event_cache = client.event_cache();
952
953        // Don't forget to subscribe and like.
954        event_cache.subscribe().unwrap();
955
956        client.base_client().get_or_create_room(room_id, RoomState::Joined);
957        let room = client.get_room(room_id).unwrap();
958
959        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
960        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
961
962        // Propagate an update for a message and a prev-batch token.
963        let timeline = Timeline {
964            limited: true,
965            prev_batch: Some("raclette".to_owned()),
966            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
967        };
968
969        room_event_cache
970            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
971            .await
972            .unwrap();
973
974        // Just checking the generic update is correct.
975        assert_matches!(
976            generic_stream.recv().await,
977            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
978                assert_eq!(expected_room_id, room_id);
979            }
980        );
981        assert!(generic_stream.is_empty());
982
983        // Check the storage.
984        let linked_chunk = from_all_chunks::<3, _, _>(
985            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
986        )
987        .unwrap()
988        .unwrap();
989
990        assert_eq!(linked_chunk.chunks().count(), 2);
991
992        let mut chunks = linked_chunk.chunks();
993
994        // We start with the gap.
995        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
996            assert_eq!(gap.token, "raclette");
997        });
998
999        // Then we have the stored event.
1000        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1001            assert_eq!(events.len(), 1);
1002            let deserialized = events[0].raw().deserialize().unwrap();
1003            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1004            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1005        });
1006
1007        // That's all, folks!
1008        assert!(chunks.next().is_none());
1009    }
1010
1011    #[async_test]
1012    async fn test_write_to_storage_strips_bundled_relations() {
1013        let room_id = room_id!("!galette:saucisse.bzh");
1014        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1015
1016        let event_cache_store = Arc::new(MemoryStore::new());
1017
1018        let client = MockClientBuilder::new(None)
1019            .on_builder(|builder| {
1020                builder.store_config(
1021                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1022                        .event_cache_store(event_cache_store.clone()),
1023                )
1024            })
1025            .build()
1026            .await;
1027
1028        let event_cache = client.event_cache();
1029
1030        // Don't forget to subscribe and like.
1031        event_cache.subscribe().unwrap();
1032
1033        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1034        let room = client.get_room(room_id).unwrap();
1035
1036        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1037        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1038
1039        // Propagate an update for a message with bundled relations.
1040        let ev = f
1041            .text_msg("hey yo")
1042            .sender(*ALICE)
1043            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
1044            .into_event();
1045
1046        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1047
1048        room_event_cache
1049            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1050            .await
1051            .unwrap();
1052
1053        // Just checking the generic update is correct.
1054        assert_matches!(
1055            generic_stream.recv().await,
1056            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1057                assert_eq!(expected_room_id, room_id);
1058            }
1059        );
1060        assert!(generic_stream.is_empty());
1061
1062        // The in-memory linked chunk keeps the bundled relation.
1063        {
1064            let events = room_event_cache.events().await.unwrap();
1065
1066            assert_eq!(events.len(), 1);
1067
1068            let ev = events[0].raw().deserialize().unwrap();
1069            assert_let!(
1070                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1071            );
1072
1073            let original = msg.as_original().unwrap();
1074            assert_eq!(original.content.body(), "hey yo");
1075            assert!(original.unsigned.relations.replace.is_some());
1076        }
1077
1078        // The one in storage does not.
1079        let linked_chunk = from_all_chunks::<3, _, _>(
1080            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1081        )
1082        .unwrap()
1083        .unwrap();
1084
1085        assert_eq!(linked_chunk.chunks().count(), 1);
1086
1087        let mut chunks = linked_chunk.chunks();
1088        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1089            assert_eq!(events.len(), 1);
1090
1091            let ev = events[0].raw().deserialize().unwrap();
1092            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1093
1094            let original = msg.as_original().unwrap();
1095            assert_eq!(original.content.body(), "hey yo");
1096            assert!(original.unsigned.relations.replace.is_none());
1097        });
1098
1099        // That's all, folks!
1100        assert!(chunks.next().is_none());
1101    }
1102
1103    #[async_test]
1104    async fn test_clear() {
1105        let room_id = room_id!("!galette:saucisse.bzh");
1106        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1107
1108        let event_cache_store = Arc::new(MemoryStore::new());
1109
1110        let event_id1 = event_id!("$1");
1111        let event_id2 = event_id!("$2");
1112
1113        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1114        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1115
1116        // Prefill the store with some data.
1117        event_cache_store
1118            .handle_linked_chunk_updates(
1119                LinkedChunkId::Room(room_id),
1120                vec![
1121                    // An empty items chunk.
1122                    Update::NewItemsChunk {
1123                        previous: None,
1124                        new: ChunkIdentifier::new(0),
1125                        next: None,
1126                    },
1127                    // A gap chunk.
1128                    Update::NewGapChunk {
1129                        previous: Some(ChunkIdentifier::new(0)),
1130                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1131                        new: ChunkIdentifier::new(42),
1132                        next: None,
1133                        gap: Gap { token: "comté".to_owned() },
1134                    },
1135                    // Another items chunk, non-empty this time.
1136                    Update::NewItemsChunk {
1137                        previous: Some(ChunkIdentifier::new(42)),
1138                        new: ChunkIdentifier::new(1),
1139                        next: None,
1140                    },
1141                    Update::PushItems {
1142                        at: Position::new(ChunkIdentifier::new(1), 0),
1143                        items: vec![ev1.clone()],
1144                    },
1145                    // And another items chunk, non-empty again.
1146                    Update::NewItemsChunk {
1147                        previous: Some(ChunkIdentifier::new(1)),
1148                        new: ChunkIdentifier::new(2),
1149                        next: None,
1150                    },
1151                    Update::PushItems {
1152                        at: Position::new(ChunkIdentifier::new(2), 0),
1153                        items: vec![ev2.clone()],
1154                    },
1155                ],
1156            )
1157            .await
1158            .unwrap();
1159
1160        let client = MockClientBuilder::new(None)
1161            .on_builder(|builder| {
1162                builder.store_config(
1163                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1164                        .event_cache_store(event_cache_store.clone()),
1165                )
1166            })
1167            .build()
1168            .await;
1169
1170        let event_cache = client.event_cache();
1171
1172        // Don't forget to subscribe and like.
1173        event_cache.subscribe().unwrap();
1174
1175        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1176        let room = client.get_room(room_id).unwrap();
1177
1178        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1179
1180        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1181        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1182
1183        // The rooms knows about all cached events.
1184        {
1185            assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1186            assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1187        }
1188
1189        // But only part of events are loaded from the store
1190        {
1191            // The room must contain only one event because only one chunk has been loaded.
1192            assert_eq!(items.len(), 1);
1193            assert_eq!(items[0].event_id().unwrap(), event_id2);
1194
1195            assert!(stream.is_empty());
1196        }
1197
1198        // Let's load more chunks to load all events.
1199        {
1200            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1201
1202            assert_let_timeout!(
1203                Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1204                    stream.recv()
1205            );
1206            assert_eq!(diffs.len(), 1);
1207            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1208                // Here you are `event_id1`!
1209                assert_eq!(event.event_id().unwrap(), event_id1);
1210            });
1211
1212            assert!(stream.is_empty());
1213
1214            assert_let_timeout!(
1215                Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1216                    generic_stream.recv()
1217            );
1218            assert_eq!(room_id, expected_room_id);
1219            assert!(generic_stream.is_empty());
1220        }
1221
1222        // After clearing,…
1223        room_event_cache.clear().await.unwrap();
1224
1225        //… we get an update that the content has been cleared.
1226        assert_let_timeout!(
1227            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1228                stream.recv()
1229        );
1230        assert_eq!(diffs.len(), 1);
1231        assert_let!(VectorDiff::Clear = &diffs[0]);
1232
1233        // … same with a generic update.
1234        assert_let_timeout!(
1235            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1236        );
1237        assert_eq!(received_room_id, room_id);
1238        assert!(generic_stream.is_empty());
1239
1240        // Events individually are not forgotten by the event cache, after clearing a
1241        // room.
1242        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1243
1244        // But their presence in a linked chunk is forgotten.
1245        let items = room_event_cache.events().await.unwrap();
1246        assert!(items.is_empty());
1247
1248        // The event cache store too.
1249        let linked_chunk = from_all_chunks::<3, _, _>(
1250            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1251        )
1252        .unwrap()
1253        .unwrap();
1254
1255        // Note: while the event cache store could return `None` here, clearing it will
1256        // reset it to its initial form, maintaining the invariant that it
1257        // contains a single items chunk that's empty.
1258        assert_eq!(linked_chunk.num_items(), 0);
1259    }
1260
1261    #[async_test]
1262    async fn test_load_from_storage() {
1263        let room_id = room_id!("!galette:saucisse.bzh");
1264        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1265
1266        let event_cache_store = Arc::new(MemoryStore::new());
1267
1268        let event_id1 = event_id!("$1");
1269        let event_id2 = event_id!("$2");
1270
1271        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1272        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1273
1274        // Prefill the store with some data.
1275        event_cache_store
1276            .handle_linked_chunk_updates(
1277                LinkedChunkId::Room(room_id),
1278                vec![
1279                    // An empty items chunk.
1280                    Update::NewItemsChunk {
1281                        previous: None,
1282                        new: ChunkIdentifier::new(0),
1283                        next: None,
1284                    },
1285                    // A gap chunk.
1286                    Update::NewGapChunk {
1287                        previous: Some(ChunkIdentifier::new(0)),
1288                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1289                        new: ChunkIdentifier::new(42),
1290                        next: None,
1291                        gap: Gap { token: "cheddar".to_owned() },
1292                    },
1293                    // Another items chunk, non-empty this time.
1294                    Update::NewItemsChunk {
1295                        previous: Some(ChunkIdentifier::new(42)),
1296                        new: ChunkIdentifier::new(1),
1297                        next: None,
1298                    },
1299                    Update::PushItems {
1300                        at: Position::new(ChunkIdentifier::new(1), 0),
1301                        items: vec![ev1.clone()],
1302                    },
1303                    // And another items chunk, non-empty again.
1304                    Update::NewItemsChunk {
1305                        previous: Some(ChunkIdentifier::new(1)),
1306                        new: ChunkIdentifier::new(2),
1307                        next: None,
1308                    },
1309                    Update::PushItems {
1310                        at: Position::new(ChunkIdentifier::new(2), 0),
1311                        items: vec![ev2.clone()],
1312                    },
1313                ],
1314            )
1315            .await
1316            .unwrap();
1317
1318        let client = MockClientBuilder::new(None)
1319            .on_builder(|builder| {
1320                builder.store_config(
1321                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1322                        .event_cache_store(event_cache_store.clone()),
1323                )
1324            })
1325            .build()
1326            .await;
1327
1328        let event_cache = client.event_cache();
1329
1330        // Don't forget to subscribe and like.
1331        event_cache.subscribe().unwrap();
1332
1333        // Let's check whether the generic updates are received for the initialisation.
1334        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1335
1336        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1337        let room = client.get_room(room_id).unwrap();
1338
1339        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1340
1341        // The room event cache has been loaded. A generic update must have been
1342        // triggered.
1343        assert_matches!(
1344            generic_stream.recv().await,
1345            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1346                assert_eq!(room_id, expected_room_id);
1347            }
1348        );
1349        assert!(generic_stream.is_empty());
1350
1351        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1352
1353        // The initial items contain one event because only the last chunk is loaded by
1354        // default.
1355        assert_eq!(items.len(), 1);
1356        assert_eq!(items[0].event_id().unwrap(), event_id2);
1357        assert!(stream.is_empty());
1358
1359        // The event cache knows only all events though, even if they aren't loaded.
1360        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1361        assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1362
1363        // Let's paginate to load more events.
1364        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1365
1366        assert_let_timeout!(
1367            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1368                stream.recv()
1369        );
1370        assert_eq!(diffs.len(), 1);
1371        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1372            assert_eq!(event.event_id().unwrap(), event_id1);
1373        });
1374
1375        assert!(stream.is_empty());
1376
1377        // A generic update is triggered too.
1378        assert_matches!(
1379            generic_stream.recv().await,
1380            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1381                assert_eq!(expected_room_id, room_id);
1382            }
1383        );
1384        assert!(generic_stream.is_empty());
1385
1386        // A new update with one of these events leads to deduplication.
1387        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1388
1389        room_event_cache
1390            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1391            .await
1392            .unwrap();
1393
1394        // Just checking the generic update is correct. There is a duplicate event, so
1395        // no generic changes whatsoever!
1396        assert!(generic_stream.recv().now_or_never().is_none());
1397
1398        // The stream doesn't report these changes *yet*. Use the items vector given
1399        // when subscribing, to check that the items correspond to their new
1400        // positions. The duplicated item is removed (so it's not the first
1401        // element anymore), and it's added to the back of the list.
1402        let items = room_event_cache.events().await.unwrap();
1403        assert_eq!(items.len(), 2);
1404        assert_eq!(items[0].event_id().unwrap(), event_id1);
1405        assert_eq!(items[1].event_id().unwrap(), event_id2);
1406    }
1407
1408    #[async_test]
1409    async fn test_load_from_storage_resilient_to_failure() {
1410        let room_id = room_id!("!fondue:patate.ch");
1411        let event_cache_store = Arc::new(MemoryStore::new());
1412
1413        let event = EventFactory::new()
1414            .room(room_id)
1415            .sender(user_id!("@ben:saucisse.bzh"))
1416            .text_msg("foo")
1417            .event_id(event_id!("$42"))
1418            .into_event();
1419
1420        // Prefill the store with invalid data: two chunks that form a cycle.
1421        event_cache_store
1422            .handle_linked_chunk_updates(
1423                LinkedChunkId::Room(room_id),
1424                vec![
1425                    Update::NewItemsChunk {
1426                        previous: None,
1427                        new: ChunkIdentifier::new(0),
1428                        next: None,
1429                    },
1430                    Update::PushItems {
1431                        at: Position::new(ChunkIdentifier::new(0), 0),
1432                        items: vec![event],
1433                    },
1434                    Update::NewItemsChunk {
1435                        previous: Some(ChunkIdentifier::new(0)),
1436                        new: ChunkIdentifier::new(1),
1437                        next: Some(ChunkIdentifier::new(0)),
1438                    },
1439                ],
1440            )
1441            .await
1442            .unwrap();
1443
1444        let client = MockClientBuilder::new(None)
1445            .on_builder(|builder| {
1446                builder.store_config(
1447                    StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1448                        .event_cache_store(event_cache_store.clone()),
1449                )
1450            })
1451            .build()
1452            .await;
1453
1454        let event_cache = client.event_cache();
1455
1456        // Don't forget to subscribe and like.
1457        event_cache.subscribe().unwrap();
1458
1459        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1460        let room = client.get_room(room_id).unwrap();
1461
1462        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1463
1464        let items = room_event_cache.events().await.unwrap();
1465
1466        // Because the persisted content was invalid, the room store is reset: there are
1467        // no events in the cache.
1468        assert!(items.is_empty());
1469
1470        // Storage doesn't contain anything. It would also be valid that it contains a
1471        // single initial empty items chunk.
1472        let raw_chunks =
1473            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1474        assert!(raw_chunks.is_empty());
1475    }
1476
1477    #[async_test]
1478    async fn test_no_useless_gaps() {
1479        let room_id = room_id!("!galette:saucisse.bzh");
1480
1481        let client = MockClientBuilder::new(None).build().await;
1482
1483        let event_cache = client.event_cache();
1484        event_cache.subscribe().unwrap();
1485
1486        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1487        let room = client.get_room(room_id).unwrap();
1488        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1489        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1490
1491        let f = EventFactory::new().room(room_id).sender(*ALICE);
1492
1493        // Propagate an update including a limited timeline with one message and a
1494        // prev-batch token.
1495        room_event_cache
1496            .handle_joined_room_update(JoinedRoomUpdate {
1497                timeline: Timeline {
1498                    limited: true,
1499                    prev_batch: Some("raclette".to_owned()),
1500                    events: vec![f.text_msg("hey yo").into_event()],
1501                },
1502                ..Default::default()
1503            })
1504            .await
1505            .unwrap();
1506
1507        // Just checking the generic update is correct.
1508        assert_matches!(
1509            generic_stream.recv().await,
1510            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1511                assert_eq!(expected_room_id, room_id);
1512            }
1513        );
1514        assert!(generic_stream.is_empty());
1515
1516        {
1517            let state = room_event_cache.inner.state.read().await.unwrap();
1518
1519            let mut num_gaps = 0;
1520            let mut num_events = 0;
1521
1522            for c in state.room_linked_chunk().chunks() {
1523                match c.content() {
1524                    ChunkContent::Items(items) => num_events += items.len(),
1525                    ChunkContent::Gap(_) => num_gaps += 1,
1526                }
1527            }
1528
1529            // The limited sync unloads the chunk, so it will appear as if there are only
1530            // the events.
1531            assert_eq!(num_gaps, 0);
1532            assert_eq!(num_events, 1);
1533        }
1534
1535        // But if I manually reload more of the chunk, the gap will be present.
1536        assert_matches!(
1537            room_event_cache.pagination().load_more_events_backwards().await.unwrap(),
1538            LoadMoreEventsBackwardsOutcome::Gap { .. }
1539        );
1540
1541        {
1542            let state = room_event_cache.inner.state.read().await.unwrap();
1543
1544            let mut num_gaps = 0;
1545            let mut num_events = 0;
1546
1547            for c in state.room_linked_chunk().chunks() {
1548                match c.content() {
1549                    ChunkContent::Items(items) => num_events += items.len(),
1550                    ChunkContent::Gap(_) => num_gaps += 1,
1551                }
1552            }
1553
1554            // The gap must have been stored.
1555            assert_eq!(num_gaps, 1);
1556            assert_eq!(num_events, 1);
1557        }
1558
1559        // Now, propagate an update for another message, but the timeline isn't limited
1560        // this time.
1561        room_event_cache
1562            .handle_joined_room_update(JoinedRoomUpdate {
1563                timeline: Timeline {
1564                    limited: false,
1565                    prev_batch: Some("fondue".to_owned()),
1566                    events: vec![f.text_msg("sup").into_event()],
1567                },
1568                ..Default::default()
1569            })
1570            .await
1571            .unwrap();
1572
1573        // Just checking the generic update is correct.
1574        assert_matches!(
1575            generic_stream.recv().await,
1576            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1577                assert_eq!(expected_room_id, room_id);
1578            }
1579        );
1580        assert!(generic_stream.is_empty());
1581
1582        {
1583            let state = room_event_cache.inner.state.read().await.unwrap();
1584
1585            let mut num_gaps = 0;
1586            let mut num_events = 0;
1587
1588            for c in state.room_linked_chunk().chunks() {
1589                match c.content() {
1590                    ChunkContent::Items(items) => num_events += items.len(),
1591                    ChunkContent::Gap(gap) => {
1592                        assert_eq!(gap.token, "raclette");
1593                        num_gaps += 1;
1594                    }
1595                }
1596            }
1597
1598            // There's only the previous gap, no new ones.
1599            assert_eq!(num_gaps, 1);
1600            assert_eq!(num_events, 2);
1601        }
1602    }
1603
1604    #[async_test]
1605    async fn test_shrink_to_last_chunk() {
1606        let room_id = room_id!("!galette:saucisse.bzh");
1607
1608        let client = MockClientBuilder::new(None).build().await;
1609
1610        let f = EventFactory::new().room(room_id);
1611
1612        let evid1 = event_id!("$1");
1613        let evid2 = event_id!("$2");
1614
1615        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1616        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1617
1618        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1619        {
1620            client
1621                .event_cache_store()
1622                .lock()
1623                .await
1624                .expect("Could not acquire the event cache lock")
1625                .as_clean()
1626                .expect("Could not acquire a clean event cache lock")
1627                .handle_linked_chunk_updates(
1628                    LinkedChunkId::Room(room_id),
1629                    vec![
1630                        Update::NewItemsChunk {
1631                            previous: None,
1632                            new: ChunkIdentifier::new(0),
1633                            next: None,
1634                        },
1635                        Update::PushItems {
1636                            at: Position::new(ChunkIdentifier::new(0), 0),
1637                            items: vec![ev1],
1638                        },
1639                        Update::NewItemsChunk {
1640                            previous: Some(ChunkIdentifier::new(0)),
1641                            new: ChunkIdentifier::new(1),
1642                            next: None,
1643                        },
1644                        Update::PushItems {
1645                            at: Position::new(ChunkIdentifier::new(1), 0),
1646                            items: vec![ev2],
1647                        },
1648                    ],
1649                )
1650                .await
1651                .unwrap();
1652        }
1653
1654        let event_cache = client.event_cache();
1655        event_cache.subscribe().unwrap();
1656
1657        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1658        let room = client.get_room(room_id).unwrap();
1659        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1660
1661        // Sanity check: lazily loaded, so only includes one item at start.
1662        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1663        assert_eq!(events.len(), 1);
1664        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1665        assert!(stream.is_empty());
1666
1667        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1668
1669        // Force loading the full linked chunk by back-paginating.
1670        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1671        assert_eq!(outcome.events.len(), 1);
1672        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1673        assert!(outcome.reached_start);
1674
1675        // We also get an update about the loading from the store.
1676        assert_let_timeout!(
1677            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1678                stream.recv()
1679        );
1680        assert_eq!(diffs.len(), 1);
1681        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1682            assert_eq!(value.event_id().as_deref(), Some(evid1));
1683        });
1684
1685        assert!(stream.is_empty());
1686
1687        // Same for the generic update.
1688        assert_let_timeout!(
1689            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1690        );
1691        assert_eq!(expected_room_id, room_id);
1692        assert!(generic_stream.is_empty());
1693
1694        // Shrink the linked chunk to the last chunk.
1695        room_event_cache
1696            .inner
1697            .state
1698            .write()
1699            .await
1700            .unwrap()
1701            .reload()
1702            .await
1703            .expect("shrinking should succeed");
1704
1705        // We receive updates about the changes to the linked chunk.
1706        assert_let_timeout!(
1707            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1708                stream.recv()
1709        );
1710        assert_eq!(diffs.len(), 2);
1711        assert_matches!(&diffs[0], VectorDiff::Clear);
1712        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1713            assert_eq!(values.len(), 1);
1714            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1715        });
1716
1717        assert!(stream.is_empty());
1718
1719        // A generic update has been received.
1720        assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1721        assert!(generic_stream.is_empty());
1722
1723        // When reading the events, we do get only the last one.
1724        let events = room_event_cache.events().await.unwrap();
1725        assert_eq!(events.len(), 1);
1726        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1727
1728        // But if we back-paginate, we don't need access to network to find out about
1729        // the previous event.
1730        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1731        assert_eq!(outcome.events.len(), 1);
1732        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1733        assert!(outcome.reached_start);
1734    }
1735
1736    #[async_test]
1737    async fn test_room_ordering() {
1738        let room_id = room_id!("!galette:saucisse.bzh");
1739
1740        let client = MockClientBuilder::new(None).build().await;
1741
1742        let f = EventFactory::new().room(room_id).sender(*ALICE);
1743
1744        let evid1 = event_id!("$1");
1745        let evid2 = event_id!("$2");
1746        let evid3 = event_id!("$3");
1747
1748        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1749        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1750        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1751
1752        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1753        {
1754            client
1755                .event_cache_store()
1756                .lock()
1757                .await
1758                .expect("Could not acquire the event cache lock")
1759                .as_clean()
1760                .expect("Could not acquire a clean event cache lock")
1761                .handle_linked_chunk_updates(
1762                    LinkedChunkId::Room(room_id),
1763                    vec![
1764                        Update::NewItemsChunk {
1765                            previous: None,
1766                            new: ChunkIdentifier::new(0),
1767                            next: None,
1768                        },
1769                        Update::PushItems {
1770                            at: Position::new(ChunkIdentifier::new(0), 0),
1771                            items: vec![ev1, ev2],
1772                        },
1773                        Update::NewItemsChunk {
1774                            previous: Some(ChunkIdentifier::new(0)),
1775                            new: ChunkIdentifier::new(1),
1776                            next: None,
1777                        },
1778                        Update::PushItems {
1779                            at: Position::new(ChunkIdentifier::new(1), 0),
1780                            items: vec![ev3.clone()],
1781                        },
1782                    ],
1783                )
1784                .await
1785                .unwrap();
1786        }
1787
1788        let event_cache = client.event_cache();
1789        event_cache.subscribe().unwrap();
1790
1791        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1792        let room = client.get_room(room_id).unwrap();
1793        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1794
1795        // Initially, the linked chunk only contains the last chunk, so only ev3 is
1796        // loaded.
1797        {
1798            let state = room_event_cache.inner.state.read().await.unwrap();
1799            let room_linked_chunk = state.room_linked_chunk();
1800
1801            // But we can get the order of ev1.
1802            assert_eq!(
1803                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1804                Some(0)
1805            );
1806
1807            // And that of ev2 as well.
1808            assert_eq!(
1809                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1810                Some(1)
1811            );
1812
1813            // ev3, which is loaded, also has a known ordering.
1814            let mut events = room_linked_chunk.events();
1815            let (pos, ev) = events.next().unwrap();
1816            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1817            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1818            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1819
1820            // No other loaded events.
1821            assert!(events.next().is_none());
1822        }
1823
1824        // Force loading the full linked chunk by back-paginating.
1825        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1826        assert!(outcome.reached_start);
1827
1828        // All events are now loaded, so their order is precisely their enumerated index
1829        // in a linear iteration.
1830        {
1831            let state = room_event_cache.inner.state.read().await.unwrap();
1832            let room_linked_chunk = state.room_linked_chunk();
1833
1834            for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1835                assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1836            }
1837        }
1838
1839        // Handle a gappy sync with two events (including one duplicate, so
1840        // deduplication kicks in), so that the linked chunk is shrunk to the
1841        // last chunk, and that the linked chunk only contains the last two
1842        // events.
1843        let evid4 = event_id!("$4");
1844        room_event_cache
1845            .handle_joined_room_update(JoinedRoomUpdate {
1846                timeline: Timeline {
1847                    limited: true,
1848                    prev_batch: Some("fondue".to_owned()),
1849                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1850                },
1851                ..Default::default()
1852            })
1853            .await
1854            .unwrap();
1855
1856        {
1857            let state = room_event_cache.inner.state.read().await.unwrap();
1858            let room_linked_chunk = state.room_linked_chunk();
1859
1860            // After the shrink, only evid3 and evid4 are loaded.
1861            let mut events = room_linked_chunk.events();
1862
1863            let (pos, ev) = events.next().unwrap();
1864            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1865            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1866
1867            let (pos, ev) = events.next().unwrap();
1868            assert_eq!(ev.event_id().as_deref(), Some(evid4));
1869            assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1870
1871            // No other loaded events.
1872            assert!(events.next().is_none());
1873
1874            // But we can still get the order of previous events.
1875            assert_eq!(
1876                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1877                Some(0)
1878            );
1879            assert_eq!(
1880                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1881                Some(1)
1882            );
1883
1884            // ev3 doesn't have an order with its previous position, since it's been
1885            // deduplicated.
1886            assert_eq!(
1887                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1888                None
1889            );
1890        }
1891    }
1892
1893    #[async_test]
1894    async fn test_auto_shrink_after_all_subscribers_are_gone() {
1895        let room_id = room_id!("!galette:saucisse.bzh");
1896
1897        let client = MockClientBuilder::new(None).build().await;
1898
1899        let f = EventFactory::new().room(room_id);
1900
1901        let evid1 = event_id!("$1");
1902        let evid2 = event_id!("$2");
1903
1904        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1905        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1906
1907        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1908        {
1909            client
1910                .event_cache_store()
1911                .lock()
1912                .await
1913                .expect("Could not acquire the event cache lock")
1914                .as_clean()
1915                .expect("Could not acquire a clean event cache lock")
1916                .handle_linked_chunk_updates(
1917                    LinkedChunkId::Room(room_id),
1918                    vec![
1919                        Update::NewItemsChunk {
1920                            previous: None,
1921                            new: ChunkIdentifier::new(0),
1922                            next: None,
1923                        },
1924                        Update::PushItems {
1925                            at: Position::new(ChunkIdentifier::new(0), 0),
1926                            items: vec![ev1],
1927                        },
1928                        Update::NewItemsChunk {
1929                            previous: Some(ChunkIdentifier::new(0)),
1930                            new: ChunkIdentifier::new(1),
1931                            next: None,
1932                        },
1933                        Update::PushItems {
1934                            at: Position::new(ChunkIdentifier::new(1), 0),
1935                            items: vec![ev2],
1936                        },
1937                    ],
1938                )
1939                .await
1940                .unwrap();
1941        }
1942
1943        let event_cache = client.event_cache();
1944        event_cache.subscribe().unwrap();
1945
1946        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1947        let room = client.get_room(room_id).unwrap();
1948        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1949
1950        // Sanity check: lazily loaded, so only includes one item at start.
1951        let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1952        assert_eq!(events1.len(), 1);
1953        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1954        assert!(stream1.is_empty());
1955
1956        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1957
1958        // Force loading the full linked chunk by back-paginating.
1959        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1960        assert_eq!(outcome.events.len(), 1);
1961        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1962        assert!(outcome.reached_start);
1963
1964        // We also get an update about the loading from the store. Ignore it, for this
1965        // test's sake.
1966        assert_let_timeout!(
1967            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1968                stream1.recv()
1969        );
1970        assert_eq!(diffs.len(), 1);
1971        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1972            assert_eq!(value.event_id().as_deref(), Some(evid1));
1973        });
1974
1975        assert!(stream1.is_empty());
1976
1977        assert_let_timeout!(
1978            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1979        );
1980        assert_eq!(expected_room_id, room_id);
1981        assert!(generic_stream.is_empty());
1982
1983        // Have another subscriber.
1984        // Since it's not the first one, and the previous one loaded some more events,
1985        // the second subscribers sees them all.
1986        let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1987        assert_eq!(events2.len(), 2);
1988        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1989        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1990        assert!(stream2.is_empty());
1991
1992        // Drop the first stream, and wait a bit.
1993        drop(stream1);
1994        yield_now().await;
1995
1996        // The second stream remains undisturbed.
1997        assert!(stream2.is_empty());
1998
1999        // Now drop the second stream, and wait a bit.
2000        drop(stream2);
2001        yield_now().await;
2002
2003        // The linked chunk must have auto-shrunk by now.
2004
2005        {
2006            // Check the inner state: there's no more shared auto-shrinker.
2007            let state = room_event_cache.inner.state.read().await.unwrap();
2008            assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
2009        }
2010
2011        // Getting the events will only give us the latest chunk.
2012        let events3 = room_event_cache.events().await.unwrap();
2013        assert_eq!(events3.len(), 1);
2014        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2015    }
2016
2017    #[async_test]
2018    async fn test_rfind_map_event_in_memory_by() {
2019        let user_id = user_id!("@mnt_io:matrix.org");
2020        let room_id = room_id!("!raclette:patate.ch");
2021        let client = MockClientBuilder::new(None).build().await;
2022
2023        let event_factory = EventFactory::new().room(room_id);
2024
2025        let event_id_0 = event_id!("$ev0");
2026        let event_id_1 = event_id!("$ev1");
2027        let event_id_2 = event_id!("$ev2");
2028        let event_id_3 = event_id!("$ev3");
2029
2030        let event_0 =
2031            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
2032        let event_1 =
2033            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
2034        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
2035        let event_3 =
2036            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
2037
2038        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
2039        // events.
2040        {
2041            client
2042                .event_cache_store()
2043                .lock()
2044                .await
2045                .expect("Could not acquire the event cache lock")
2046                .as_clean()
2047                .expect("Could not acquire a clean event cache lock")
2048                .handle_linked_chunk_updates(
2049                    LinkedChunkId::Room(room_id),
2050                    vec![
2051                        Update::NewItemsChunk {
2052                            previous: None,
2053                            new: ChunkIdentifier::new(0),
2054                            next: None,
2055                        },
2056                        Update::PushItems {
2057                            at: Position::new(ChunkIdentifier::new(0), 0),
2058                            items: vec![event_3],
2059                        },
2060                        Update::NewItemsChunk {
2061                            previous: Some(ChunkIdentifier::new(0)),
2062                            new: ChunkIdentifier::new(1),
2063                            next: None,
2064                        },
2065                        Update::PushItems {
2066                            at: Position::new(ChunkIdentifier::new(1), 0),
2067                            items: vec![event_0, event_1, event_2],
2068                        },
2069                    ],
2070                )
2071                .await
2072                .unwrap();
2073        }
2074
2075        let event_cache = client.event_cache();
2076        event_cache.subscribe().unwrap();
2077
2078        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2079        let room = client.get_room(room_id).unwrap();
2080        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2081
2082        // Look for an event from `BOB`: it must be `event_0`.
2083        assert_matches!(
2084            room_event_cache
2085                .rfind_map_event_in_memory_by(|event| {
2086                    (event.sender().as_deref() == Some(*BOB)).then(|| event.event_id())
2087                })
2088                .await,
2089            Ok(Some(event_id)) => {
2090                assert_eq!(event_id.as_deref(), Some(event_id_0));
2091            }
2092        );
2093
2094        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
2095        // because events are looked for in reverse order.
2096        assert_matches!(
2097            room_event_cache
2098                .rfind_map_event_in_memory_by(|event| {
2099                    (event.sender().as_deref() == Some(*ALICE)).then(|| event.event_id())
2100                })
2101                .await,
2102            Ok(Some(event_id)) => {
2103                assert_eq!(event_id.as_deref(), Some(event_id_2));
2104            }
2105        );
2106
2107        // Look for an event that is inside the storage, but not loaded.
2108        assert!(
2109            room_event_cache
2110                .rfind_map_event_in_memory_by(|event| {
2111                    (event.sender().as_deref() == Some(user_id)).then(|| event.event_id())
2112                })
2113                .await
2114                .unwrap()
2115                .is_none()
2116        );
2117
2118        // Look for an event that doesn't exist.
2119        assert!(
2120            room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2121        );
2122    }
2123
2124    #[async_test]
2125    async fn test_reload_when_dirty() {
2126        let user_id = user_id!("@mnt_io:matrix.org");
2127        let room_id = room_id!("!raclette:patate.ch");
2128
2129        // The storage shared by the two clients.
2130        let event_cache_store = MemoryStore::new();
2131
2132        // Client for the process 0.
2133        let client_p0 = MockClientBuilder::new(None)
2134            .on_builder(|builder| {
2135                builder.store_config(
2136                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2137                        .event_cache_store(event_cache_store.clone()),
2138                )
2139            })
2140            .build()
2141            .await;
2142
2143        // Client for the process 1.
2144        let client_p1 = MockClientBuilder::new(None)
2145            .on_builder(|builder| {
2146                builder.store_config(
2147                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2148                        .event_cache_store(event_cache_store),
2149                )
2150            })
2151            .build()
2152            .await;
2153
2154        let event_factory = EventFactory::new().room(room_id).sender(user_id);
2155
2156        let ev_id_0 = event_id!("$ev_0");
2157        let ev_id_1 = event_id!("$ev_1");
2158
2159        let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2160        let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2161
2162        // Add events to the storage (shared by the two clients!).
2163        client_p0
2164            .event_cache_store()
2165            .lock()
2166            .await
2167            .expect("[p0] Could not acquire the event cache lock")
2168            .as_clean()
2169            .expect("[p0] Could not acquire a clean event cache lock")
2170            .handle_linked_chunk_updates(
2171                LinkedChunkId::Room(room_id),
2172                vec![
2173                    Update::NewItemsChunk {
2174                        previous: None,
2175                        new: ChunkIdentifier::new(0),
2176                        next: None,
2177                    },
2178                    Update::PushItems {
2179                        at: Position::new(ChunkIdentifier::new(0), 0),
2180                        items: vec![ev_0],
2181                    },
2182                    Update::NewItemsChunk {
2183                        previous: Some(ChunkIdentifier::new(0)),
2184                        new: ChunkIdentifier::new(1),
2185                        next: None,
2186                    },
2187                    Update::PushItems {
2188                        at: Position::new(ChunkIdentifier::new(1), 0),
2189                        items: vec![ev_1],
2190                    },
2191                ],
2192            )
2193            .await
2194            .unwrap();
2195
2196        // Subscribe the event caches, and create the room.
2197        let (room_event_cache_p0, room_event_cache_p1) = {
2198            let event_cache_p0 = client_p0.event_cache();
2199            event_cache_p0.subscribe().unwrap();
2200
2201            let event_cache_p1 = client_p1.event_cache();
2202            event_cache_p1.subscribe().unwrap();
2203
2204            client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
2205            client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
2206
2207            let (room_event_cache_p0, _drop_handles) =
2208                client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2209            let (room_event_cache_p1, _drop_handles) =
2210                client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2211
2212            (room_event_cache_p0, room_event_cache_p1)
2213        };
2214
2215        // Okay. We are ready for the test!
2216        //
2217        // First off, let's check `room_event_cache_p0` has access to the first event
2218        // loaded in-memory, then do a pagination, and see more events.
2219        let mut updates_stream_p0 = {
2220            let room_event_cache = &room_event_cache_p0;
2221
2222            let (initial_updates, mut updates_stream) =
2223                room_event_cache_p0.subscribe().await.unwrap();
2224
2225            // Initial updates contain `ev_id_1` only.
2226            assert_eq!(initial_updates.len(), 1);
2227            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2228            assert!(updates_stream.is_empty());
2229
2230            // `ev_id_1` must be loaded in memory.
2231            assert!(event_loaded(room_event_cache, ev_id_1).await);
2232
2233            // `ev_id_0` must NOT be loaded in memory.
2234            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2235
2236            // Load one more event with a backpagination.
2237            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2238
2239            // A new update for `ev_id_0` must be present.
2240            assert_matches!(
2241                updates_stream.recv().await.unwrap(),
2242                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2243                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2244                    assert_matches!(
2245                        &diffs[0],
2246                        VectorDiff::Insert { index: 0, value: event } => {
2247                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2248                        }
2249                    );
2250                }
2251            );
2252
2253            // `ev_id_0` must now be loaded in memory.
2254            assert!(event_loaded(room_event_cache, ev_id_0).await);
2255
2256            updates_stream
2257        };
2258
2259        // Second, let's check `room_event_cache_p1` has the same accesses.
2260        let mut updates_stream_p1 = {
2261            let room_event_cache = &room_event_cache_p1;
2262            let (initial_updates, mut updates_stream) =
2263                room_event_cache_p1.subscribe().await.unwrap();
2264
2265            // Initial updates contain `ev_id_1` only.
2266            assert_eq!(initial_updates.len(), 1);
2267            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2268            assert!(updates_stream.is_empty());
2269
2270            // `ev_id_1` must be loaded in memory.
2271            assert!(event_loaded(room_event_cache, ev_id_1).await);
2272
2273            // `ev_id_0` must NOT be loaded in memory.
2274            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2275
2276            // Load one more event with a backpagination.
2277            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2278
2279            // A new update for `ev_id_0` must be present.
2280            assert_matches!(
2281                updates_stream.recv().await.unwrap(),
2282                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2283                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2284                    assert_matches!(
2285                        &diffs[0],
2286                        VectorDiff::Insert { index: 0, value: event } => {
2287                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2288                        }
2289                    );
2290                }
2291            );
2292
2293            // `ev_id_0` must now be loaded in memory.
2294            assert!(event_loaded(room_event_cache, ev_id_0).await);
2295
2296            updates_stream
2297        };
2298
2299        // Do this a couple times, for the fun.
2300        for _ in 0..3 {
2301            // Third, because `room_event_cache_p1` has locked the store, the lock
2302            // is dirty for `room_event_cache_p0`, so it will shrink to its last
2303            // chunk!
2304            {
2305                let room_event_cache = &room_event_cache_p0;
2306                let updates_stream = &mut updates_stream_p0;
2307
2308                // `ev_id_1` must be loaded in memory, just like before.
2309                assert!(event_loaded(room_event_cache, ev_id_1).await);
2310
2311                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2312                // state has been reloaded to its last chunk.
2313                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2314
2315                // The reload can be observed via the updates too.
2316                assert_matches!(
2317                    updates_stream.recv().await.unwrap(),
2318                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2319                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2320                        assert_matches!(&diffs[0], VectorDiff::Clear);
2321                        assert_matches!(
2322                            &diffs[1],
2323                            VectorDiff::Append { values: events } => {
2324                                assert_eq!(events.len(), 1);
2325                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2326                            }
2327                        );
2328                    }
2329                );
2330
2331                // Load one more event with a backpagination.
2332                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2333
2334                // `ev_id_0` must now be loaded in memory.
2335                assert!(event_loaded(room_event_cache, ev_id_0).await);
2336
2337                // The pagination can be observed via the updates too.
2338                assert_matches!(
2339                    updates_stream.recv().await.unwrap(),
2340                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2341                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2342                        assert_matches!(
2343                            &diffs[0],
2344                            VectorDiff::Insert { index: 0, value: event } => {
2345                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2346                            }
2347                        );
2348                    }
2349                );
2350            }
2351
2352            // Fourth, because `room_event_cache_p0` has locked the store again, the lock
2353            // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
2354            // chunk!
2355            {
2356                let room_event_cache = &room_event_cache_p1;
2357                let updates_stream = &mut updates_stream_p1;
2358
2359                // `ev_id_1` must be loaded in memory, just like before.
2360                assert!(event_loaded(room_event_cache, ev_id_1).await);
2361
2362                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2363                // state has shrunk to its last chunk.
2364                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2365
2366                // The reload can be observed via the updates too.
2367                assert_matches!(
2368                    updates_stream.recv().await.unwrap(),
2369                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2370                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2371                        assert_matches!(&diffs[0], VectorDiff::Clear);
2372                        assert_matches!(
2373                            &diffs[1],
2374                            VectorDiff::Append { values: events } => {
2375                                assert_eq!(events.len(), 1);
2376                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2377                            }
2378                        );
2379                    }
2380                );
2381
2382                // Load one more event with a backpagination.
2383                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2384
2385                // `ev_id_0` must now be loaded in memory.
2386                assert!(event_loaded(room_event_cache, ev_id_0).await);
2387
2388                // The pagination can be observed via the updates too.
2389                assert_matches!(
2390                    updates_stream.recv().await.unwrap(),
2391                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2392                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2393                        assert_matches!(
2394                            &diffs[0],
2395                            VectorDiff::Insert { index: 0, value: event } => {
2396                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2397                            }
2398                        );
2399                    }
2400                );
2401            }
2402        }
2403
2404        // Repeat that with an explicit read lock (so that we don't rely on
2405        // `event_loaded` to trigger the dirty detection).
2406        for _ in 0..3 {
2407            {
2408                let room_event_cache = &room_event_cache_p0;
2409                let updates_stream = &mut updates_stream_p0;
2410
2411                let guard = room_event_cache.inner.state.read().await.unwrap();
2412
2413                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2414                // shared access.
2415                // See `RoomEventCacheStateLock::read` to learn more.
2416
2417                // The lock is no longer marked as dirty, it's been cleaned.
2418                assert!(guard.is_dirty().not());
2419
2420                // The reload can be observed via the updates too.
2421                assert_matches!(
2422                    updates_stream.recv().await.unwrap(),
2423                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2424                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2425                        assert_matches!(&diffs[0], VectorDiff::Clear);
2426                        assert_matches!(
2427                            &diffs[1],
2428                            VectorDiff::Append { values: events } => {
2429                                assert_eq!(events.len(), 1);
2430                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2431                            }
2432                        );
2433                    }
2434                );
2435
2436                assert!(event_loaded(room_event_cache, ev_id_1).await);
2437                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2438
2439                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2440                // want to make this super explicit).
2441                //
2442                // We drop need to drop it before the pagination because the pagination needs to
2443                // obtain a write lock.
2444                drop(guard);
2445
2446                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2447                assert!(event_loaded(room_event_cache, ev_id_0).await);
2448
2449                // The pagination can be observed via the updates too.
2450                assert_matches!(
2451                    updates_stream.recv().await.unwrap(),
2452                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2453                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2454                        assert_matches!(
2455                            &diffs[0],
2456                            VectorDiff::Insert { index: 0, value: event } => {
2457                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2458                            }
2459                        );
2460                    }
2461                );
2462            }
2463
2464            {
2465                let room_event_cache = &room_event_cache_p1;
2466                let updates_stream = &mut updates_stream_p1;
2467
2468                let guard = room_event_cache.inner.state.read().await.unwrap();
2469
2470                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2471                // shared access.
2472
2473                // The lock is no longer marked as dirty, it's been cleaned.
2474                assert!(guard.is_dirty().not());
2475
2476                // The reload can be observed via the updates too.
2477                assert_matches!(
2478                    updates_stream.recv().await.unwrap(),
2479                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2480                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2481                        assert_matches!(&diffs[0], VectorDiff::Clear);
2482                        assert_matches!(
2483                            &diffs[1],
2484                            VectorDiff::Append { values: events } => {
2485                                assert_eq!(events.len(), 1);
2486                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2487                            }
2488                        );
2489                    }
2490                );
2491
2492                assert!(event_loaded(room_event_cache, ev_id_1).await);
2493                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2494
2495                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2496                // want to make this super explicit).
2497                //
2498                // We drop need to drop it before the pagination because the pagination needs to
2499                // obtain a write lock.
2500                drop(guard);
2501
2502                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2503                assert!(event_loaded(room_event_cache, ev_id_0).await);
2504
2505                // The pagination can be observed via the updates too.
2506                assert_matches!(
2507                    updates_stream.recv().await.unwrap(),
2508                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2509                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2510                        assert_matches!(
2511                            &diffs[0],
2512                            VectorDiff::Insert { index: 0, value: event } => {
2513                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2514                            }
2515                        );
2516                    }
2517                );
2518            }
2519        }
2520
2521        // Repeat that with an explicit write lock.
2522        for _ in 0..3 {
2523            {
2524                let room_event_cache = &room_event_cache_p0;
2525                let updates_stream = &mut updates_stream_p0;
2526
2527                let guard = room_event_cache.inner.state.write().await.unwrap();
2528
2529                // The lock is no longer marked as dirty, it's been cleaned.
2530                assert!(guard.is_dirty().not());
2531
2532                // The reload can be observed via the updates too.
2533                assert_matches!(
2534                    updates_stream.recv().await.unwrap(),
2535                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2536                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2537                        assert_matches!(&diffs[0], VectorDiff::Clear);
2538                        assert_matches!(
2539                            &diffs[1],
2540                            VectorDiff::Append { values: events } => {
2541                                assert_eq!(events.len(), 1);
2542                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2543                            }
2544                        );
2545                    }
2546                );
2547
2548                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2549                // needs to obtain a read lock.
2550                drop(guard);
2551
2552                assert!(event_loaded(room_event_cache, ev_id_1).await);
2553                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2554
2555                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2556                assert!(event_loaded(room_event_cache, ev_id_0).await);
2557
2558                // The pagination can be observed via the updates too.
2559                assert_matches!(
2560                    updates_stream.recv().await.unwrap(),
2561                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2562                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2563                        assert_matches!(
2564                            &diffs[0],
2565                            VectorDiff::Insert { index: 0, value: event } => {
2566                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2567                            }
2568                        );
2569                    }
2570                );
2571            }
2572
2573            {
2574                let room_event_cache = &room_event_cache_p1;
2575                let updates_stream = &mut updates_stream_p1;
2576
2577                let guard = room_event_cache.inner.state.write().await.unwrap();
2578
2579                // The lock is no longer marked as dirty, it's been cleaned.
2580                assert!(guard.is_dirty().not());
2581
2582                // The reload can be observed via the updates too.
2583                assert_matches!(
2584                    updates_stream.recv().await.unwrap(),
2585                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2586                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2587                        assert_matches!(&diffs[0], VectorDiff::Clear);
2588                        assert_matches!(
2589                            &diffs[1],
2590                            VectorDiff::Append { values: events } => {
2591                                assert_eq!(events.len(), 1);
2592                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2593                            }
2594                        );
2595                    }
2596                );
2597
2598                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2599                // needs to obtain a read lock.
2600                drop(guard);
2601
2602                assert!(event_loaded(room_event_cache, ev_id_1).await);
2603                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2604
2605                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2606                assert!(event_loaded(room_event_cache, ev_id_0).await);
2607
2608                // The pagination can be observed via the updates too.
2609                assert_matches!(
2610                    updates_stream.recv().await.unwrap(),
2611                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2612                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2613                        assert_matches!(
2614                            &diffs[0],
2615                            VectorDiff::Insert { index: 0, value: event } => {
2616                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2617                            }
2618                        );
2619                    }
2620                );
2621            }
2622        }
2623    }
2624
2625    #[async_test]
2626    async fn test_load_when_dirty() {
2627        let room_id_0 = room_id!("!raclette:patate.ch");
2628        let room_id_1 = room_id!("!morbiflette:patate.ch");
2629
2630        // The storage shared by the two clients.
2631        let event_cache_store = MemoryStore::new();
2632
2633        // Client for the process 0.
2634        let client_p0 = MockClientBuilder::new(None)
2635            .on_builder(|builder| {
2636                builder.store_config(
2637                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2638                        .event_cache_store(event_cache_store.clone()),
2639                )
2640            })
2641            .build()
2642            .await;
2643
2644        // Client for the process 1.
2645        let client_p1 = MockClientBuilder::new(None)
2646            .on_builder(|builder| {
2647                builder.store_config(
2648                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2649                        .event_cache_store(event_cache_store),
2650                )
2651            })
2652            .build()
2653            .await;
2654
2655        // Subscribe the event caches, and create the room.
2656        let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2657            let event_cache_p0 = client_p0.event_cache();
2658            event_cache_p0.subscribe().unwrap();
2659
2660            let event_cache_p1 = client_p1.event_cache();
2661            event_cache_p1.subscribe().unwrap();
2662
2663            client_p0.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2664            client_p0.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2665
2666            client_p1.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2667            client_p1.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2668
2669            let (room_event_cache_0_p0, _drop_handles) =
2670                client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2671            let (room_event_cache_0_p1, _drop_handles) =
2672                client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2673
2674            (room_event_cache_0_p0, room_event_cache_0_p1)
2675        };
2676
2677        // Let's make the cross-process lock over the store dirty.
2678        {
2679            drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2680            drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2681        }
2682
2683        // Create the `RoomEventCache` for `room_id_1`. During its creation, the
2684        // cross-process lock over the store MUST be dirty, which makes no difference as
2685        // a clean one: the state is just loaded, not reloaded.
2686        let (room_event_cache_1_p0, _) =
2687            client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2688
2689        // Check the lock isn't dirty because it's been cleared.
2690        {
2691            let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2692            assert!(guard.is_dirty().not());
2693        }
2694
2695        // The only way to test this behaviour is to see that the dirty block in
2696        // `RoomEventCacheStateLock` is covered by this test.
2697    }
2698
2699    #[async_test]
2700    async fn test_uniq_read_marker() {
2701        let client = MockClientBuilder::new(None).build().await;
2702        let room_id = room_id!("!galette:saucisse.bzh");
2703        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2704
2705        let event_cache = client.event_cache();
2706
2707        event_cache.subscribe().unwrap();
2708
2709        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2710        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
2711        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
2712
2713        assert!(events.is_empty());
2714
2715        // When sending multiple times the same read marker event,…
2716        let read_marker_event = Raw::from_json_string(
2717            json!({
2718                "content": {
2719                    "event_id": "$crepe:saucisse.bzh"
2720                },
2721                "room_id": "!galette:saucisse.bzh",
2722                "type": "m.fully_read"
2723            })
2724            .to_string(),
2725        )
2726        .unwrap();
2727        let account_data = vec![read_marker_event; 100];
2728
2729        room_event_cache
2730            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
2731            .await
2732            .unwrap();
2733
2734        // … there's only one read marker update.
2735        assert_matches!(
2736            stream.recv().await.unwrap(),
2737            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
2738        );
2739
2740        assert!(stream.recv().now_or_never().is_none());
2741
2742        // None, because an account data doesn't trigger a generic update.
2743        assert!(generic_stream.recv().now_or_never().is_none());
2744    }
2745
2746    async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2747        room_event_cache
2748            .rfind_map_event_in_memory_by(|event| {
2749                (event.event_id().as_deref() == Some(event_id)).then_some(())
2750            })
2751            .await
2752            .unwrap()
2753            .is_some()
2754    }
2755}