Skip to main content

matrix_sdk/event_cache/caches/room/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21    collections::BTreeMap,
22    fmt,
23    sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28    deserialized_responses::AmbiguityChange,
29    event_cache::Event,
30    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
31};
32use ruma::{
33    EventId, OwnedEventId, OwnedRoomId, RoomId,
34    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
35    serde::Raw,
36};
37pub(super) use state::{LockedRoomEventCacheState, RoomEventCacheStateLockWriteGuard};
38pub use subscriber::RoomEventCacheSubscriber;
39use tokio::sync::{Notify, broadcast::Receiver, mpsc};
40use tracing::{instrument, trace, warn};
41pub use updates::{
42    RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
43    RoomEventCacheUpdateSender,
44};
45
46use super::{
47    super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination},
48    TimelineVectorDiffs,
49    event_linked_chunk::sort_positions_descending,
50    thread::pagination::ThreadPagination,
51};
52use crate::{
53    client::WeakClient,
54    event_cache::{
55        EventFocusThreadMode,
56        caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus},
57    },
58    room::WeakRoom,
59};
60
61/// A subset of an event cache, for a room.
62///
63/// Cloning is shallow, and thus is cheap to do.
64#[derive(Clone)]
65pub struct RoomEventCache {
66    inner: Arc<RoomEventCacheInner>,
67}
68
69impl fmt::Debug for RoomEventCache {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("RoomEventCache").finish_non_exhaustive()
72    }
73}
74
75impl RoomEventCache {
76    /// Create a new [`RoomEventCache`] using the given room and store.
77    pub(super) fn new(
78        room_id: OwnedRoomId,
79        weak_room: WeakRoom,
80        state: LockedRoomEventCacheState,
81        shared_pagination_status: SharedObservable<SharedPaginationStatus>,
82        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
83        update_sender: RoomEventCacheUpdateSender,
84    ) -> Self {
85        Self {
86            inner: Arc::new(RoomEventCacheInner::new(
87                room_id,
88                weak_room,
89                state,
90                shared_pagination_status,
91                auto_shrink_sender,
92                update_sender,
93            )),
94        }
95    }
96
97    /// Get the room ID for this [`RoomEventCache`].
98    pub fn room_id(&self) -> &RoomId {
99        &self.inner.room_id
100    }
101
102    /// Read all current events.
103    ///
104    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
105    /// subscriber.
106    pub async fn events(&self) -> Result<Vec<Event>> {
107        let state = self.inner.state.read().await?;
108
109        Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
110    }
111
112    /// Subscribe to this room updates, after getting the initial list of
113    /// events.
114    ///
115    /// Use [`RoomEventCache::events`] to get all current events without the
116    /// subscriber. Creating, and especially dropping, a
117    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
118    pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
119        let state = self.inner.state.read().await?;
120        let events =
121            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
122
123        let subscriber_count = state.subscriber_count();
124        let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
125        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
126
127        let subscriber = RoomEventCacheSubscriber::new(
128            self.inner.update_sender.new_room_receiver(),
129            self.inner.room_id.clone(),
130            self.inner.auto_shrink_sender.clone(),
131            subscriber_count.clone(),
132        );
133
134        Ok((events, subscriber))
135    }
136
137    /// Subscribe to thread for a given root event, and get a (maybe empty)
138    /// initially known list of events for that thread.
139    pub async fn subscribe_to_thread(
140        &self,
141        thread_root: OwnedEventId,
142    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
143        let mut state = self.inner.state.write().await?;
144
145        state.subscribe_to_thread(thread_root).await
146    }
147
148    /// Subscribe to the pinned event cache for this room.
149    ///
150    /// This is a persisted view over the pinned events of a room.
151    ///
152    /// The pinned events will be initially reloaded from storage, and/or loaded
153    /// from a network request to fetch the latest pinned events and their
154    /// relations, to update it as needed. The list of pinned events will
155    /// also be kept up-to-date as new events are pinned, and new
156    /// related events show up from other sources.
157    pub async fn subscribe_to_pinned_events(
158        &self,
159    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
160        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
161        let state = self.inner.state.read().await?;
162
163        state.subscribe_to_pinned_events(room).await
164    }
165
166    /// Create or get an event-focused timeline cache for this room.
167    ///
168    /// This creates a timeline centered around a specific event (e.g., for
169    /// permalinks), in a given mode, supporting both forward and backward
170    /// pagination.
171    ///
172    /// If the focused event is part of a thread, the timeline will
173    /// automatically use thread-specific pagination.
174    ///
175    /// If the thread mode is defined to [`EventFocusThreadMode::ForceThread`],
176    /// the timeline will be focused on the thread root of the thread the
177    /// target event belongs to, or it will consider that the target event
178    /// itself is the thread root.
179    #[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))]
180    pub async fn get_or_create_event_focused_cache(
181        &self,
182        event_id: OwnedEventId,
183        num_context_events: u16,
184        thread_mode: EventFocusThreadMode,
185    ) -> Result<EventFocusedCache> {
186        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
187        let guard = self.inner.state.read().await?;
188
189        // Check if we already have a cache for this event.
190        if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
191            trace!("the cache was already created, returning it");
192            return Ok(cache);
193        }
194
195        // Create a new cache.
196        let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone();
197
198        // Make sure to drop the guard before calling `start_from` below, as it may need
199        // to lock the room event cache's state again, when memoizing events
200        // received from the network response.
201        drop(guard);
202
203        let room_id = room.room_id().to_owned();
204        let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone());
205
206        trace!("creating a fresh event-focused cache");
207        let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender);
208
209        // Initialize the cache from the server.
210        cache.start_from(room, num_context_events, thread_mode).await?;
211
212        let mut guard = self.inner.state.write().await?;
213
214        // Check again if we already have a cache for this event, just in case there was
215        // a race with another caller during initialization.
216        if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
217            trace!("another cache has been racily created, returning it");
218            return Ok(cache);
219        }
220
221        // Insert the cache in the map.
222        guard.insert_event_focused_cache(event_id, thread_mode, cache.clone());
223
224        Ok(cache)
225    }
226
227    /// Get an event-focused cache for this event and thread mode, if it exists.
228    ///
229    /// Otherwise, returns `None`.
230    ///
231    /// Use [`Self::get_or_create_event_focused_cache`] for ensuring such a
232    /// cache exists.
233    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234    pub async fn get_event_focused_cache(
235        &self,
236        event_id: OwnedEventId,
237        thread_mode: EventFocusThreadMode,
238    ) -> Result<Option<EventFocusedCache>> {
239        Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode))
240    }
241
242    /// Return a [`RoomPagination`] type useful for running back-pagination
243    /// queries in the current room.
244    pub fn pagination(&self) -> RoomPagination {
245        RoomPagination::new(self.inner.clone())
246    }
247
248    /// Return a [`ThreadPagination`] type useful for running back-pagination
249    /// queries in the `thread_id` thread.
250    pub async fn thread_pagination(&self, thread_id: OwnedEventId) -> Result<ThreadPagination> {
251        Ok(self.inner.state.write().await?.get_or_reload_thread(thread_id).pagination())
252    }
253
254    /// Try to find a single event in this room, starting from the most recent
255    /// event.
256    ///
257    /// The `predicate` receives the current event as its single argument.
258    ///
259    /// **Warning**! It looks into the loaded events from the in-memory linked
260    /// chunk **only**. It doesn't look inside the storage.
261    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
262    where
263        P: FnMut(&Event) -> Option<O>,
264    {
265        Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
266    }
267
268    /// Try to find an event by ID in this room.
269    ///
270    /// It starts by looking into loaded events before looking inside the
271    /// storage.
272    pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
273        Ok(self
274            .inner
275            .state
276            .read()
277            .await?
278            .find_event(event_id)
279            .await
280            .ok()
281            .flatten()
282            .map(|(_loc, event)| event))
283    }
284
285    /// Try to find an event by ID in this room, along with its related events.
286    ///
287    /// You can filter which types of related events to retrieve using
288    /// `filter`. `None` will retrieve related events of any type.
289    ///
290    /// The related events are sorted like this:
291    ///
292    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
293    ///   located at the beginning of the array.
294    /// - events present in the linked chunk (be it in memory or in the storage)
295    ///   will be sorted according to their ordering in the linked chunk.
296    pub async fn find_event_with_relations(
297        &self,
298        event_id: &EventId,
299        filter: Option<Vec<RelationType>>,
300    ) -> Result<Option<(Event, Vec<Event>)>> {
301        // Search in all loaded or stored events.
302        Ok(self
303            .inner
304            .state
305            .read()
306            .await?
307            .find_event_with_relations(event_id, filter.clone())
308            .await
309            .ok()
310            .flatten())
311    }
312
313    /// Try to find the related events for an event by ID in this room.
314    ///
315    /// You can filter which types of related events to retrieve using
316    /// `filter`. `None` will retrieve related events of any type.
317    ///
318    /// The related events are sorted like this:
319    ///
320    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
321    ///   located at the beginning of the array.
322    /// - events present in the linked chunk (be it in memory or in the storage)
323    ///   will be sorted according to their ordering in the linked chunk.
324    pub async fn find_event_relations(
325        &self,
326        event_id: &EventId,
327        filter: Option<Vec<RelationType>>,
328    ) -> Result<Vec<Event>> {
329        // Search in all loaded or stored events.
330        self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
331    }
332
333    /// Clear all the storage for this [`RoomEventCache`].
334    ///
335    /// This will get rid of all the events from the linked chunk and persisted
336    /// storage.
337    pub async fn clear(&self) -> Result<()> {
338        // Clear the linked chunk and persisted storage.
339        let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
340
341        // Notify observers about the update.
342        self.inner.update_sender.send(
343            RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
344                diffs: updates_as_vector_diffs,
345                origin: EventsOrigin::Cache,
346            }),
347            Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
348        );
349
350        Ok(())
351    }
352
353    /// Return a reference to the state.
354    pub(in super::super) fn state(&self) -> &LockedRoomEventCacheState {
355        &self.inner.state
356    }
357
358    /// Handle a [`JoinedRoomUpdate`].
359    #[instrument(skip_all, fields(room_id = %self.room_id()))]
360    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
361        self.inner
362            .handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes)
363            .await?;
364        self.inner.handle_account_data(updates.account_data);
365
366        Ok(())
367    }
368
369    /// Handle a [`LeftRoomUpdate`].
370    #[instrument(skip_all, fields(room_id = %self.room_id()))]
371    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
372        self.inner.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
373
374        Ok(())
375    }
376
377    /// Get a reference to the [`RoomEventCacheUpdateSender`].
378    pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
379        &self.inner.update_sender
380    }
381
382    /// Handle a single event from the `SendQueue`.
383    pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
384        self.inner
385            .handle_timeline(
386                Timeline { limited: false, prev_batch: None, events: vec![event] },
387                Vec::new(),
388                BTreeMap::new(),
389            )
390            .await
391    }
392
393    /// Save some events in the event cache, for further retrieval with
394    /// [`Self::event`].
395    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
396        match self.inner.state.write().await {
397            Ok(mut state_guard) => {
398                if let Err(err) = state_guard.save_events(events).await {
399                    warn!("couldn't save event in the event cache: {err}");
400                }
401            }
402
403            Err(err) => {
404                warn!("couldn't save event in the event cache: {err}");
405            }
406        }
407    }
408
409    /// Return a nice debug string (a vector of lines) for the linked chunk of
410    /// events for this room.
411    pub async fn debug_string(&self) -> Vec<String> {
412        match self.inner.state.read().await {
413            Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
414            Err(err) => {
415                warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
416
417                vec![]
418            }
419        }
420    }
421}
422
423/// The (non-cloneable) details of the `RoomEventCache`.
424pub(super) struct RoomEventCacheInner {
425    /// The room id for this room.
426    room_id: OwnedRoomId,
427
428    pub weak_room: WeakRoom,
429
430    /// State for this room's event cache.
431    pub state: LockedRoomEventCacheState,
432
433    /// A notifier that we received a new pagination token.
434    pub pagination_batch_token_notifier: Notify,
435
436    pub shared_pagination_status: SharedObservable<SharedPaginationStatus>,
437
438    /// Sender to the auto-shrink channel.
439    ///
440    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
441    /// more details.
442    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
443
444    /// Update sender for this room.
445    update_sender: RoomEventCacheUpdateSender,
446}
447
448impl RoomEventCacheInner {
449    /// Creates a new cache for a room, and subscribes to room updates, so as
450    /// to handle new timeline events.
451    fn new(
452        room_id: OwnedRoomId,
453        weak_room: WeakRoom,
454        state: LockedRoomEventCacheState,
455        shared_pagination_status: SharedObservable<SharedPaginationStatus>,
456        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
457        update_sender: RoomEventCacheUpdateSender,
458    ) -> Self {
459        Self {
460            room_id,
461            weak_room,
462            state,
463            update_sender,
464            pagination_batch_token_notifier: Default::default(),
465            auto_shrink_sender,
466            shared_pagination_status,
467        }
468    }
469
470    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
471        if account_data.is_empty() {
472            return;
473        }
474
475        let mut handled_read_marker = false;
476
477        trace!("Handling account data");
478
479        for raw_event in account_data {
480            match raw_event.deserialize() {
481                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
482                    // If duplicated, do not forward read marker multiple times
483                    // to avoid clutter the update channel.
484                    if handled_read_marker {
485                        continue;
486                    }
487
488                    handled_read_marker = true;
489
490                    // Propagate to observers. (We ignore the error if there aren't any.)
491                    self.update_sender.send(
492                        RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
493                        None,
494                    );
495                }
496
497                Ok(_) => {
498                    // We're not interested in other room account data updates,
499                    // at this point.
500                }
501
502                Err(e) => {
503                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
504                    warn!(event_type, "Failed to deserialize account data: {e}");
505                }
506            }
507        }
508    }
509
510    /// Handle a [`Timeline`], i.e. new events received by a sync for this
511    /// room.
512    async fn handle_timeline(
513        &self,
514        timeline: Timeline,
515        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
516        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
517    ) -> Result<()> {
518        if timeline.events.is_empty()
519            && timeline.prev_batch.is_none()
520            && ephemeral_events.is_empty()
521            && ambiguity_changes.is_empty()
522        {
523            return Ok(());
524        }
525
526        // Add all the events to the backend.
527        trace!("adding new events");
528
529        let (stored_prev_batch_token, timeline_event_diffs) =
530            self.state.write().await?.handle_sync(timeline, &ephemeral_events).await?;
531
532        // Now that all events have been added, we can trigger the
533        // `pagination_token_notifier`.
534        if stored_prev_batch_token {
535            self.pagination_batch_token_notifier.notify_one();
536        }
537
538        // The order matters here: first send the timeline event diffs, then only the
539        // related events (read receipts, etc.).
540        if !timeline_event_diffs.is_empty() {
541            self.update_sender.send(
542                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
543                    diffs: timeline_event_diffs,
544                    origin: EventsOrigin::Sync,
545                }),
546                Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
547            );
548        }
549
550        if !ephemeral_events.is_empty() {
551            self.update_sender
552                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
553        }
554
555        if !ambiguity_changes.is_empty() {
556            self.update_sender
557                .send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }, None);
558        }
559
560        Ok(())
561    }
562}
563
564#[derive(Clone, Copy)]
565pub(in super::super) enum PostProcessingOrigin {
566    Sync,
567    Backpagination,
568    #[cfg(feature = "e2e-encryption")]
569    Redecryption,
570}
571
572#[cfg(test)]
573mod tests {
574    use matrix_sdk_base::{RoomState, event_cache::Event};
575    use matrix_sdk_test::{async_test, event_factory::EventFactory};
576    use ruma::{
577        RoomId, event_id,
578        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
579        room_id, user_id,
580    };
581
582    use crate::test_utils::logged_in_client;
583
584    #[async_test]
585    async fn test_find_event_by_id_with_edit_relation() {
586        let original_id = event_id!("$original");
587        let related_id = event_id!("$related");
588        let room_id = room_id!("!galette:saucisse.bzh");
589        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
590
591        assert_relations(
592            room_id,
593            f.text_msg("Original event").event_id(original_id).into(),
594            f.text_msg("* An edited event")
595                .edit(
596                    original_id,
597                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
598                )
599                .event_id(related_id)
600                .into(),
601            f,
602        )
603        .await;
604    }
605
606    #[async_test]
607    async fn test_find_event_by_id_with_thread_reply_relation() {
608        let original_id = event_id!("$original");
609        let related_id = event_id!("$related");
610        let room_id = room_id!("!galette:saucisse.bzh");
611        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
612
613        assert_relations(
614            room_id,
615            f.text_msg("Original event").event_id(original_id).into(),
616            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
617            f,
618        )
619        .await;
620    }
621
622    #[async_test]
623    async fn test_find_event_by_id_with_reaction_relation() {
624        let original_id = event_id!("$original");
625        let related_id = event_id!("$related");
626        let room_id = room_id!("!galette:saucisse.bzh");
627        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
628
629        assert_relations(
630            room_id,
631            f.text_msg("Original event").event_id(original_id).into(),
632            f.reaction(original_id, ":D").event_id(related_id).into(),
633            f,
634        )
635        .await;
636    }
637
638    #[async_test]
639    async fn test_find_event_by_id_with_poll_response_relation() {
640        let original_id = event_id!("$original");
641        let related_id = event_id!("$related");
642        let room_id = room_id!("!galette:saucisse.bzh");
643        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
644
645        assert_relations(
646            room_id,
647            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
648                .event_id(original_id)
649                .into(),
650            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
651            f,
652        )
653        .await;
654    }
655
656    #[async_test]
657    async fn test_find_event_by_id_with_poll_end_relation() {
658        let original_id = event_id!("$original");
659        let related_id = event_id!("$related");
660        let room_id = room_id!("!galette:saucisse.bzh");
661        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
662
663        assert_relations(
664            room_id,
665            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
666                .event_id(original_id)
667                .into(),
668            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
669            f,
670        )
671        .await;
672    }
673
674    #[async_test]
675    async fn test_find_event_by_id_with_filtered_relationships() {
676        let original_id = event_id!("$original");
677        let related_id = event_id!("$related");
678        let associated_related_id = event_id!("$recursive_related");
679        let room_id = room_id!("!galette:saucisse.bzh");
680        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
681
682        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
683        let related_event = event_factory
684            .text_msg("* Edited event")
685            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
686            .event_id(related_id)
687            .into();
688        let associated_related_event =
689            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
690
691        let client = logged_in_client(None).await;
692
693        let event_cache = client.event_cache();
694        event_cache.subscribe().unwrap();
695
696        client.base_client().get_or_create_room(room_id, RoomState::Joined);
697        let room = client.get_room(room_id).unwrap();
698
699        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
700
701        // Save the original event.
702        room_event_cache.save_events([original_event]).await;
703
704        // Save the related event.
705        room_event_cache.save_events([related_event]).await;
706
707        // Save the associated related event, which redacts the related event.
708        room_event_cache.save_events([associated_related_event]).await;
709
710        let filter = Some(vec![RelationType::Replacement]);
711        let (event, related_events) = room_event_cache
712            .find_event_with_relations(original_id, filter)
713            .await
714            .expect("Failed to find the event with relations")
715            .expect("Event has no relation");
716        // Fetched event is the right one.
717        let cached_event_id = event.event_id().unwrap();
718        assert_eq!(cached_event_id, original_id);
719
720        // There's only the edit event (an edit event can't have its own edit event).
721        assert_eq!(related_events.len(), 1);
722
723        let related_event_id = related_events[0].event_id().unwrap();
724        assert_eq!(related_event_id, related_id);
725
726        // Now we'll filter threads instead, there should be no related events
727        let filter = Some(vec![RelationType::Thread]);
728        let (event, related_events) = room_event_cache
729            .find_event_with_relations(original_id, filter)
730            .await
731            .expect("Failed to find the event with relations")
732            .expect("Event has no relation");
733
734        // Fetched event is the right one.
735        let cached_event_id = event.event_id().unwrap();
736        assert_eq!(cached_event_id, original_id);
737        // No Thread related events found
738        assert!(related_events.is_empty());
739    }
740
741    #[async_test]
742    async fn test_find_event_by_id_with_recursive_relation() {
743        let original_id = event_id!("$original");
744        let related_id = event_id!("$related");
745        let associated_related_id = event_id!("$recursive_related");
746        let room_id = room_id!("!galette:saucisse.bzh");
747        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
748
749        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
750        let related_event = event_factory
751            .text_msg("* Edited event")
752            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
753            .event_id(related_id)
754            .into();
755        let associated_related_event =
756            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
757
758        let client = logged_in_client(None).await;
759
760        let event_cache = client.event_cache();
761        event_cache.subscribe().unwrap();
762
763        client.base_client().get_or_create_room(room_id, RoomState::Joined);
764        let room = client.get_room(room_id).unwrap();
765
766        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
767
768        // Save the original event.
769        room_event_cache.save_events([original_event]).await;
770
771        // Save the related event.
772        room_event_cache.save_events([related_event]).await;
773
774        // Save the associated related event, which redacts the related event.
775        room_event_cache.save_events([associated_related_event]).await;
776
777        let (event, related_events) = room_event_cache
778            .find_event_with_relations(original_id, None)
779            .await
780            .expect("Failed to find the event with relations")
781            .expect("Event has no relation");
782        // Fetched event is the right one.
783        let cached_event_id = event.event_id().unwrap();
784        assert_eq!(cached_event_id, original_id);
785
786        // There are both the related id and the associatively related id
787        assert_eq!(related_events.len(), 2);
788
789        let related_event_id = related_events[0].event_id().unwrap();
790        assert_eq!(related_event_id, related_id);
791        let related_event_id = related_events[1].event_id().unwrap();
792        assert_eq!(related_event_id, associated_related_id);
793    }
794
795    async fn assert_relations(
796        room_id: &RoomId,
797        original_event: Event,
798        related_event: Event,
799        event_factory: EventFactory,
800    ) {
801        let client = logged_in_client(None).await;
802
803        let event_cache = client.event_cache();
804        event_cache.subscribe().unwrap();
805
806        client.base_client().get_or_create_room(room_id, RoomState::Joined);
807        let room = client.get_room(room_id).unwrap();
808
809        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
810
811        // Save the original event.
812        let original_event_id = original_event.event_id().unwrap();
813        room_event_cache.save_events([original_event]).await;
814
815        // Save an unrelated event to check it's not in the related events list.
816        let unrelated_id = event_id!("$2");
817        room_event_cache
818            .save_events([event_factory
819                .text_msg("An unrelated event")
820                .event_id(unrelated_id)
821                .into()])
822            .await;
823
824        // Save the related event.
825        let related_id = related_event.event_id().unwrap();
826        room_event_cache.save_events([related_event]).await;
827
828        let (event, related_events) = room_event_cache
829            .find_event_with_relations(&original_event_id, None)
830            .await
831            .expect("Failed to find the event with relations")
832            .expect("Event has no relation");
833        // Fetched event is the right one.
834        let cached_event_id = event.event_id().unwrap();
835        assert_eq!(cached_event_id, original_event_id);
836
837        // There is only the actually related event in the related ones
838        let related_event_id = related_events[0].event_id().unwrap();
839        assert_eq!(related_event_id, related_id);
840    }
841}
842
843#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
844mod timed_tests {
845    use std::{ops::Not, sync::Arc};
846
847    use assert_matches::assert_matches;
848    use assert_matches2::assert_let;
849    use eyeball_im::VectorDiff;
850    use futures_util::FutureExt;
851    use matrix_sdk_base::{
852        RoomState,
853        event_cache::{
854            Gap,
855            store::{EventCacheStore as _, MemoryStore},
856        },
857        linked_chunk::{
858            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
859            lazy_loader::from_all_chunks,
860        },
861        store::StoreConfig,
862        sync::{JoinedRoomUpdate, Timeline},
863    };
864    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
865    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
866    use ruma::{
867        EventId, event_id,
868        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
869        room_id,
870        serde::Raw,
871        user_id,
872    };
873    use serde_json::json;
874    use tokio::task::yield_now;
875
876    use super::{
877        super::{
878            super::TimelineVectorDiffs, lock::Reload as _,
879            pagination::LoadMoreEventsBackwardsOutcome,
880        },
881        RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
882    };
883    use crate::{assert_let_timeout, test_utils::client::MockClientBuilder};
884
885    #[async_test]
886    async fn test_write_to_storage() {
887        let room_id = room_id!("!galette:saucisse.bzh");
888        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
889
890        let event_cache_store = Arc::new(MemoryStore::new());
891
892        let client = MockClientBuilder::new(None)
893            .on_builder(|builder| {
894                builder.store_config(
895                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
896                        .event_cache_store(event_cache_store.clone()),
897                )
898            })
899            .build()
900            .await;
901
902        let event_cache = client.event_cache();
903
904        // Don't forget to subscribe and like.
905        event_cache.subscribe().unwrap();
906
907        client.base_client().get_or_create_room(room_id, RoomState::Joined);
908        let room = client.get_room(room_id).unwrap();
909
910        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
911        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
912
913        // Propagate an update for a message and a prev-batch token.
914        let timeline = Timeline {
915            limited: true,
916            prev_batch: Some("raclette".to_owned()),
917            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
918        };
919
920        room_event_cache
921            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
922            .await
923            .unwrap();
924
925        // Just checking the generic update is correct.
926        assert_matches!(
927            generic_stream.recv().await,
928            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
929                assert_eq!(expected_room_id, room_id);
930            }
931        );
932        assert!(generic_stream.is_empty());
933
934        // Check the storage.
935        let linked_chunk = from_all_chunks::<3, _, _>(
936            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
937        )
938        .unwrap()
939        .unwrap();
940
941        assert_eq!(linked_chunk.chunks().count(), 2);
942
943        let mut chunks = linked_chunk.chunks();
944
945        // We start with the gap.
946        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
947            assert_eq!(gap.token, "raclette");
948        });
949
950        // Then we have the stored event.
951        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
952            assert_eq!(events.len(), 1);
953            let deserialized = events[0].raw().deserialize().unwrap();
954            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
955            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
956        });
957
958        // That's all, folks!
959        assert!(chunks.next().is_none());
960    }
961
962    #[async_test]
963    async fn test_write_to_storage_strips_bundled_relations() {
964        let room_id = room_id!("!galette:saucisse.bzh");
965        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
966
967        let event_cache_store = Arc::new(MemoryStore::new());
968
969        let client = MockClientBuilder::new(None)
970            .on_builder(|builder| {
971                builder.store_config(
972                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
973                        .event_cache_store(event_cache_store.clone()),
974                )
975            })
976            .build()
977            .await;
978
979        let event_cache = client.event_cache();
980
981        // Don't forget to subscribe and like.
982        event_cache.subscribe().unwrap();
983
984        client.base_client().get_or_create_room(room_id, RoomState::Joined);
985        let room = client.get_room(room_id).unwrap();
986
987        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
988        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
989
990        // Propagate an update for a message with bundled relations.
991        let ev = f
992            .text_msg("hey yo")
993            .sender(*ALICE)
994            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
995            .into_event();
996
997        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
998
999        room_event_cache
1000            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1001            .await
1002            .unwrap();
1003
1004        // Just checking the generic update is correct.
1005        assert_matches!(
1006            generic_stream.recv().await,
1007            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1008                assert_eq!(expected_room_id, room_id);
1009            }
1010        );
1011        assert!(generic_stream.is_empty());
1012
1013        // The in-memory linked chunk keeps the bundled relation.
1014        {
1015            let events = room_event_cache.events().await.unwrap();
1016
1017            assert_eq!(events.len(), 1);
1018
1019            let ev = events[0].raw().deserialize().unwrap();
1020            assert_let!(
1021                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1022            );
1023
1024            let original = msg.as_original().unwrap();
1025            assert_eq!(original.content.body(), "hey yo");
1026            assert!(original.unsigned.relations.replace.is_some());
1027        }
1028
1029        // The one in storage does not.
1030        let linked_chunk = from_all_chunks::<3, _, _>(
1031            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1032        )
1033        .unwrap()
1034        .unwrap();
1035
1036        assert_eq!(linked_chunk.chunks().count(), 1);
1037
1038        let mut chunks = linked_chunk.chunks();
1039        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1040            assert_eq!(events.len(), 1);
1041
1042            let ev = events[0].raw().deserialize().unwrap();
1043            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1044
1045            let original = msg.as_original().unwrap();
1046            assert_eq!(original.content.body(), "hey yo");
1047            assert!(original.unsigned.relations.replace.is_none());
1048        });
1049
1050        // That's all, folks!
1051        assert!(chunks.next().is_none());
1052    }
1053
1054    #[async_test]
1055    async fn test_clear() {
1056        let room_id = room_id!("!galette:saucisse.bzh");
1057        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1058
1059        let event_cache_store = Arc::new(MemoryStore::new());
1060
1061        let event_id1 = event_id!("$1");
1062        let event_id2 = event_id!("$2");
1063
1064        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1065        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1066
1067        // Prefill the store with some data.
1068        event_cache_store
1069            .handle_linked_chunk_updates(
1070                LinkedChunkId::Room(room_id),
1071                vec![
1072                    // An empty items chunk.
1073                    Update::NewItemsChunk {
1074                        previous: None,
1075                        new: ChunkIdentifier::new(0),
1076                        next: None,
1077                    },
1078                    // A gap chunk.
1079                    Update::NewGapChunk {
1080                        previous: Some(ChunkIdentifier::new(0)),
1081                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1082                        new: ChunkIdentifier::new(42),
1083                        next: None,
1084                        gap: Gap { token: "comté".to_owned() },
1085                    },
1086                    // Another items chunk, non-empty this time.
1087                    Update::NewItemsChunk {
1088                        previous: Some(ChunkIdentifier::new(42)),
1089                        new: ChunkIdentifier::new(1),
1090                        next: None,
1091                    },
1092                    Update::PushItems {
1093                        at: Position::new(ChunkIdentifier::new(1), 0),
1094                        items: vec![ev1.clone()],
1095                    },
1096                    // And another items chunk, non-empty again.
1097                    Update::NewItemsChunk {
1098                        previous: Some(ChunkIdentifier::new(1)),
1099                        new: ChunkIdentifier::new(2),
1100                        next: None,
1101                    },
1102                    Update::PushItems {
1103                        at: Position::new(ChunkIdentifier::new(2), 0),
1104                        items: vec![ev2.clone()],
1105                    },
1106                ],
1107            )
1108            .await
1109            .unwrap();
1110
1111        let client = MockClientBuilder::new(None)
1112            .on_builder(|builder| {
1113                builder.store_config(
1114                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1115                        .event_cache_store(event_cache_store.clone()),
1116                )
1117            })
1118            .build()
1119            .await;
1120
1121        let event_cache = client.event_cache();
1122
1123        // Don't forget to subscribe and like.
1124        event_cache.subscribe().unwrap();
1125
1126        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1127        let room = client.get_room(room_id).unwrap();
1128
1129        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1130
1131        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1132        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1133
1134        // The rooms knows about all cached events.
1135        {
1136            assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1137            assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1138        }
1139
1140        // But only part of events are loaded from the store
1141        {
1142            // The room must contain only one event because only one chunk has been loaded.
1143            assert_eq!(items.len(), 1);
1144            assert_eq!(items[0].event_id().unwrap(), event_id2);
1145
1146            assert!(stream.is_empty());
1147        }
1148
1149        // Let's load more chunks to load all events.
1150        {
1151            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1152
1153            assert_let_timeout!(
1154                Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1155                    stream.recv()
1156            );
1157            assert_eq!(diffs.len(), 1);
1158            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1159                // Here you are `event_id1`!
1160                assert_eq!(event.event_id().unwrap(), event_id1);
1161            });
1162
1163            assert!(stream.is_empty());
1164
1165            assert_let_timeout!(
1166                Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1167                    generic_stream.recv()
1168            );
1169            assert_eq!(room_id, expected_room_id);
1170            assert!(generic_stream.is_empty());
1171        }
1172
1173        // After clearing,…
1174        room_event_cache.clear().await.unwrap();
1175
1176        //… we get an update that the content has been cleared.
1177        assert_let_timeout!(
1178            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1179                stream.recv()
1180        );
1181        assert_eq!(diffs.len(), 1);
1182        assert_let!(VectorDiff::Clear = &diffs[0]);
1183
1184        // … same with a generic update.
1185        assert_let_timeout!(
1186            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1187        );
1188        assert_eq!(received_room_id, room_id);
1189        assert!(generic_stream.is_empty());
1190
1191        // Events individually are not forgotten by the event cache, after clearing a
1192        // room.
1193        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1194
1195        // But their presence in a linked chunk is forgotten.
1196        let items = room_event_cache.events().await.unwrap();
1197        assert!(items.is_empty());
1198
1199        // The event cache store too.
1200        let linked_chunk = from_all_chunks::<3, _, _>(
1201            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1202        )
1203        .unwrap()
1204        .unwrap();
1205
1206        // Note: while the event cache store could return `None` here, clearing it will
1207        // reset it to its initial form, maintaining the invariant that it
1208        // contains a single items chunk that's empty.
1209        assert_eq!(linked_chunk.num_items(), 0);
1210    }
1211
1212    #[async_test]
1213    async fn test_load_from_storage() {
1214        let room_id = room_id!("!galette:saucisse.bzh");
1215        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1216
1217        let event_cache_store = Arc::new(MemoryStore::new());
1218
1219        let event_id1 = event_id!("$1");
1220        let event_id2 = event_id!("$2");
1221
1222        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1223        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1224
1225        // Prefill the store with some data.
1226        event_cache_store
1227            .handle_linked_chunk_updates(
1228                LinkedChunkId::Room(room_id),
1229                vec![
1230                    // An empty items chunk.
1231                    Update::NewItemsChunk {
1232                        previous: None,
1233                        new: ChunkIdentifier::new(0),
1234                        next: None,
1235                    },
1236                    // A gap chunk.
1237                    Update::NewGapChunk {
1238                        previous: Some(ChunkIdentifier::new(0)),
1239                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1240                        new: ChunkIdentifier::new(42),
1241                        next: None,
1242                        gap: Gap { token: "cheddar".to_owned() },
1243                    },
1244                    // Another items chunk, non-empty this time.
1245                    Update::NewItemsChunk {
1246                        previous: Some(ChunkIdentifier::new(42)),
1247                        new: ChunkIdentifier::new(1),
1248                        next: None,
1249                    },
1250                    Update::PushItems {
1251                        at: Position::new(ChunkIdentifier::new(1), 0),
1252                        items: vec![ev1.clone()],
1253                    },
1254                    // And another items chunk, non-empty again.
1255                    Update::NewItemsChunk {
1256                        previous: Some(ChunkIdentifier::new(1)),
1257                        new: ChunkIdentifier::new(2),
1258                        next: None,
1259                    },
1260                    Update::PushItems {
1261                        at: Position::new(ChunkIdentifier::new(2), 0),
1262                        items: vec![ev2.clone()],
1263                    },
1264                ],
1265            )
1266            .await
1267            .unwrap();
1268
1269        let client = MockClientBuilder::new(None)
1270            .on_builder(|builder| {
1271                builder.store_config(
1272                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1273                        .event_cache_store(event_cache_store.clone()),
1274                )
1275            })
1276            .build()
1277            .await;
1278
1279        let event_cache = client.event_cache();
1280
1281        // Don't forget to subscribe and like.
1282        event_cache.subscribe().unwrap();
1283
1284        // Let's check whether the generic updates are received for the initialisation.
1285        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1286
1287        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1288        let room = client.get_room(room_id).unwrap();
1289
1290        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1291
1292        // The room event cache has been loaded. A generic update must have been
1293        // triggered.
1294        assert_matches!(
1295            generic_stream.recv().await,
1296            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1297                assert_eq!(room_id, expected_room_id);
1298            }
1299        );
1300        assert!(generic_stream.is_empty());
1301
1302        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1303
1304        // The initial items contain one event because only the last chunk is loaded by
1305        // default.
1306        assert_eq!(items.len(), 1);
1307        assert_eq!(items[0].event_id().unwrap(), event_id2);
1308        assert!(stream.is_empty());
1309
1310        // The event cache knows only all events though, even if they aren't loaded.
1311        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1312        assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1313
1314        // Let's paginate to load more events.
1315        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1316
1317        assert_let_timeout!(
1318            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1319                stream.recv()
1320        );
1321        assert_eq!(diffs.len(), 1);
1322        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1323            assert_eq!(event.event_id().unwrap(), event_id1);
1324        });
1325
1326        assert!(stream.is_empty());
1327
1328        // A generic update is triggered too.
1329        assert_matches!(
1330            generic_stream.recv().await,
1331            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1332                assert_eq!(expected_room_id, room_id);
1333            }
1334        );
1335        assert!(generic_stream.is_empty());
1336
1337        // A new update with one of these events leads to deduplication.
1338        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1339
1340        room_event_cache
1341            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1342            .await
1343            .unwrap();
1344
1345        // Just checking the generic update is correct. There is a duplicate event, so
1346        // no generic changes whatsoever!
1347        assert!(generic_stream.recv().now_or_never().is_none());
1348
1349        // The stream doesn't report these changes *yet*. Use the items vector given
1350        // when subscribing, to check that the items correspond to their new
1351        // positions. The duplicated item is removed (so it's not the first
1352        // element anymore), and it's added to the back of the list.
1353        let items = room_event_cache.events().await.unwrap();
1354        assert_eq!(items.len(), 2);
1355        assert_eq!(items[0].event_id().unwrap(), event_id1);
1356        assert_eq!(items[1].event_id().unwrap(), event_id2);
1357    }
1358
1359    #[async_test]
1360    async fn test_load_from_storage_resilient_to_failure() {
1361        let room_id = room_id!("!fondue:patate.ch");
1362        let event_cache_store = Arc::new(MemoryStore::new());
1363
1364        let event = EventFactory::new()
1365            .room(room_id)
1366            .sender(user_id!("@ben:saucisse.bzh"))
1367            .text_msg("foo")
1368            .event_id(event_id!("$42"))
1369            .into_event();
1370
1371        // Prefill the store with invalid data: two chunks that form a cycle.
1372        event_cache_store
1373            .handle_linked_chunk_updates(
1374                LinkedChunkId::Room(room_id),
1375                vec![
1376                    Update::NewItemsChunk {
1377                        previous: None,
1378                        new: ChunkIdentifier::new(0),
1379                        next: None,
1380                    },
1381                    Update::PushItems {
1382                        at: Position::new(ChunkIdentifier::new(0), 0),
1383                        items: vec![event],
1384                    },
1385                    Update::NewItemsChunk {
1386                        previous: Some(ChunkIdentifier::new(0)),
1387                        new: ChunkIdentifier::new(1),
1388                        next: Some(ChunkIdentifier::new(0)),
1389                    },
1390                ],
1391            )
1392            .await
1393            .unwrap();
1394
1395        let client = MockClientBuilder::new(None)
1396            .on_builder(|builder| {
1397                builder.store_config(
1398                    StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1399                        .event_cache_store(event_cache_store.clone()),
1400                )
1401            })
1402            .build()
1403            .await;
1404
1405        let event_cache = client.event_cache();
1406
1407        // Don't forget to subscribe and like.
1408        event_cache.subscribe().unwrap();
1409
1410        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1411        let room = client.get_room(room_id).unwrap();
1412
1413        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1414
1415        let items = room_event_cache.events().await.unwrap();
1416
1417        // Because the persisted content was invalid, the room store is reset: there are
1418        // no events in the cache.
1419        assert!(items.is_empty());
1420
1421        // Storage doesn't contain anything. It would also be valid that it contains a
1422        // single initial empty items chunk.
1423        let raw_chunks =
1424            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1425        assert!(raw_chunks.is_empty());
1426    }
1427
1428    #[async_test]
1429    async fn test_no_useless_gaps() {
1430        let room_id = room_id!("!galette:saucisse.bzh");
1431
1432        let client = MockClientBuilder::new(None).build().await;
1433
1434        let event_cache = client.event_cache();
1435        event_cache.subscribe().unwrap();
1436
1437        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1438        let room = client.get_room(room_id).unwrap();
1439        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1440        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1441
1442        let f = EventFactory::new().room(room_id).sender(*ALICE);
1443
1444        // Propagate an update including a limited timeline with one message and a
1445        // prev-batch token.
1446        room_event_cache
1447            .handle_joined_room_update(JoinedRoomUpdate {
1448                timeline: Timeline {
1449                    limited: true,
1450                    prev_batch: Some("raclette".to_owned()),
1451                    events: vec![f.text_msg("hey yo").into_event()],
1452                },
1453                ..Default::default()
1454            })
1455            .await
1456            .unwrap();
1457
1458        // Just checking the generic update is correct.
1459        assert_matches!(
1460            generic_stream.recv().await,
1461            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1462                assert_eq!(expected_room_id, room_id);
1463            }
1464        );
1465        assert!(generic_stream.is_empty());
1466
1467        {
1468            let state = room_event_cache.inner.state.read().await.unwrap();
1469
1470            let mut num_gaps = 0;
1471            let mut num_events = 0;
1472
1473            for c in state.room_linked_chunk().chunks() {
1474                match c.content() {
1475                    ChunkContent::Items(items) => num_events += items.len(),
1476                    ChunkContent::Gap(_) => num_gaps += 1,
1477                }
1478            }
1479
1480            // The limited sync unloads the chunk, so it will appear as if there are only
1481            // the events.
1482            assert_eq!(num_gaps, 0);
1483            assert_eq!(num_events, 1);
1484        }
1485
1486        // But if I manually reload more of the chunk, the gap will be present.
1487        assert_matches!(
1488            room_event_cache.pagination().load_more_events_backwards().await.unwrap(),
1489            LoadMoreEventsBackwardsOutcome::Gap { .. }
1490        );
1491
1492        {
1493            let state = room_event_cache.inner.state.read().await.unwrap();
1494
1495            let mut num_gaps = 0;
1496            let mut num_events = 0;
1497
1498            for c in state.room_linked_chunk().chunks() {
1499                match c.content() {
1500                    ChunkContent::Items(items) => num_events += items.len(),
1501                    ChunkContent::Gap(_) => num_gaps += 1,
1502                }
1503            }
1504
1505            // The gap must have been stored.
1506            assert_eq!(num_gaps, 1);
1507            assert_eq!(num_events, 1);
1508        }
1509
1510        // Now, propagate an update for another message, but the timeline isn't limited
1511        // this time.
1512        room_event_cache
1513            .handle_joined_room_update(JoinedRoomUpdate {
1514                timeline: Timeline {
1515                    limited: false,
1516                    prev_batch: Some("fondue".to_owned()),
1517                    events: vec![f.text_msg("sup").into_event()],
1518                },
1519                ..Default::default()
1520            })
1521            .await
1522            .unwrap();
1523
1524        // Just checking the generic update is correct.
1525        assert_matches!(
1526            generic_stream.recv().await,
1527            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1528                assert_eq!(expected_room_id, room_id);
1529            }
1530        );
1531        assert!(generic_stream.is_empty());
1532
1533        {
1534            let state = room_event_cache.inner.state.read().await.unwrap();
1535
1536            let mut num_gaps = 0;
1537            let mut num_events = 0;
1538
1539            for c in state.room_linked_chunk().chunks() {
1540                match c.content() {
1541                    ChunkContent::Items(items) => num_events += items.len(),
1542                    ChunkContent::Gap(gap) => {
1543                        assert_eq!(gap.token, "raclette");
1544                        num_gaps += 1;
1545                    }
1546                }
1547            }
1548
1549            // There's only the previous gap, no new ones.
1550            assert_eq!(num_gaps, 1);
1551            assert_eq!(num_events, 2);
1552        }
1553    }
1554
1555    #[async_test]
1556    async fn test_shrink_to_last_chunk() {
1557        let room_id = room_id!("!galette:saucisse.bzh");
1558
1559        let client = MockClientBuilder::new(None).build().await;
1560
1561        let f = EventFactory::new().room(room_id);
1562
1563        let evid1 = event_id!("$1");
1564        let evid2 = event_id!("$2");
1565
1566        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1567        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1568
1569        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1570        {
1571            client
1572                .event_cache_store()
1573                .lock()
1574                .await
1575                .expect("Could not acquire the event cache lock")
1576                .as_clean()
1577                .expect("Could not acquire a clean event cache lock")
1578                .handle_linked_chunk_updates(
1579                    LinkedChunkId::Room(room_id),
1580                    vec![
1581                        Update::NewItemsChunk {
1582                            previous: None,
1583                            new: ChunkIdentifier::new(0),
1584                            next: None,
1585                        },
1586                        Update::PushItems {
1587                            at: Position::new(ChunkIdentifier::new(0), 0),
1588                            items: vec![ev1],
1589                        },
1590                        Update::NewItemsChunk {
1591                            previous: Some(ChunkIdentifier::new(0)),
1592                            new: ChunkIdentifier::new(1),
1593                            next: None,
1594                        },
1595                        Update::PushItems {
1596                            at: Position::new(ChunkIdentifier::new(1), 0),
1597                            items: vec![ev2],
1598                        },
1599                    ],
1600                )
1601                .await
1602                .unwrap();
1603        }
1604
1605        let event_cache = client.event_cache();
1606        event_cache.subscribe().unwrap();
1607
1608        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1609        let room = client.get_room(room_id).unwrap();
1610        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1611
1612        // Sanity check: lazily loaded, so only includes one item at start.
1613        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1614        assert_eq!(events.len(), 1);
1615        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1616        assert!(stream.is_empty());
1617
1618        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1619
1620        // Force loading the full linked chunk by back-paginating.
1621        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1622        assert_eq!(outcome.events.len(), 1);
1623        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1624        assert!(outcome.reached_start);
1625
1626        // We also get an update about the loading from the store.
1627        assert_let_timeout!(
1628            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1629                stream.recv()
1630        );
1631        assert_eq!(diffs.len(), 1);
1632        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1633            assert_eq!(value.event_id().as_deref(), Some(evid1));
1634        });
1635
1636        assert!(stream.is_empty());
1637
1638        // Same for the generic update.
1639        assert_let_timeout!(
1640            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1641        );
1642        assert_eq!(expected_room_id, room_id);
1643        assert!(generic_stream.is_empty());
1644
1645        // Shrink the linked chunk to the last chunk.
1646        room_event_cache
1647            .inner
1648            .state
1649            .write()
1650            .await
1651            .unwrap()
1652            .reload()
1653            .await
1654            .expect("shrinking should succeed");
1655
1656        // We receive updates about the changes to the linked chunk.
1657        assert_let_timeout!(
1658            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1659                stream.recv()
1660        );
1661        assert_eq!(diffs.len(), 2);
1662        assert_matches!(&diffs[0], VectorDiff::Clear);
1663        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1664            assert_eq!(values.len(), 1);
1665            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1666        });
1667
1668        assert!(stream.is_empty());
1669
1670        // A generic update has been received.
1671        assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1672        assert!(generic_stream.is_empty());
1673
1674        // When reading the events, we do get only the last one.
1675        let events = room_event_cache.events().await.unwrap();
1676        assert_eq!(events.len(), 1);
1677        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1678
1679        // But if we back-paginate, we don't need access to network to find out about
1680        // the previous event.
1681        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1682        assert_eq!(outcome.events.len(), 1);
1683        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1684        assert!(outcome.reached_start);
1685    }
1686
1687    #[async_test]
1688    async fn test_room_ordering() {
1689        let room_id = room_id!("!galette:saucisse.bzh");
1690
1691        let client = MockClientBuilder::new(None).build().await;
1692
1693        let f = EventFactory::new().room(room_id).sender(*ALICE);
1694
1695        let evid1 = event_id!("$1");
1696        let evid2 = event_id!("$2");
1697        let evid3 = event_id!("$3");
1698
1699        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1700        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1701        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1702
1703        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1704        {
1705            client
1706                .event_cache_store()
1707                .lock()
1708                .await
1709                .expect("Could not acquire the event cache lock")
1710                .as_clean()
1711                .expect("Could not acquire a clean event cache lock")
1712                .handle_linked_chunk_updates(
1713                    LinkedChunkId::Room(room_id),
1714                    vec![
1715                        Update::NewItemsChunk {
1716                            previous: None,
1717                            new: ChunkIdentifier::new(0),
1718                            next: None,
1719                        },
1720                        Update::PushItems {
1721                            at: Position::new(ChunkIdentifier::new(0), 0),
1722                            items: vec![ev1, ev2],
1723                        },
1724                        Update::NewItemsChunk {
1725                            previous: Some(ChunkIdentifier::new(0)),
1726                            new: ChunkIdentifier::new(1),
1727                            next: None,
1728                        },
1729                        Update::PushItems {
1730                            at: Position::new(ChunkIdentifier::new(1), 0),
1731                            items: vec![ev3.clone()],
1732                        },
1733                    ],
1734                )
1735                .await
1736                .unwrap();
1737        }
1738
1739        let event_cache = client.event_cache();
1740        event_cache.subscribe().unwrap();
1741
1742        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1743        let room = client.get_room(room_id).unwrap();
1744        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1745
1746        // Initially, the linked chunk only contains the last chunk, so only ev3 is
1747        // loaded.
1748        {
1749            let state = room_event_cache.inner.state.read().await.unwrap();
1750            let room_linked_chunk = state.room_linked_chunk();
1751
1752            // But we can get the order of ev1.
1753            assert_eq!(
1754                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1755                Some(0)
1756            );
1757
1758            // And that of ev2 as well.
1759            assert_eq!(
1760                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1761                Some(1)
1762            );
1763
1764            // ev3, which is loaded, also has a known ordering.
1765            let mut events = room_linked_chunk.events();
1766            let (pos, ev) = events.next().unwrap();
1767            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1768            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1769            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1770
1771            // No other loaded events.
1772            assert!(events.next().is_none());
1773        }
1774
1775        // Force loading the full linked chunk by back-paginating.
1776        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1777        assert!(outcome.reached_start);
1778
1779        // All events are now loaded, so their order is precisely their enumerated index
1780        // in a linear iteration.
1781        {
1782            let state = room_event_cache.inner.state.read().await.unwrap();
1783            let room_linked_chunk = state.room_linked_chunk();
1784
1785            for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1786                assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1787            }
1788        }
1789
1790        // Handle a gappy sync with two events (including one duplicate, so
1791        // deduplication kicks in), so that the linked chunk is shrunk to the
1792        // last chunk, and that the linked chunk only contains the last two
1793        // events.
1794        let evid4 = event_id!("$4");
1795        room_event_cache
1796            .handle_joined_room_update(JoinedRoomUpdate {
1797                timeline: Timeline {
1798                    limited: true,
1799                    prev_batch: Some("fondue".to_owned()),
1800                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1801                },
1802                ..Default::default()
1803            })
1804            .await
1805            .unwrap();
1806
1807        {
1808            let state = room_event_cache.inner.state.read().await.unwrap();
1809            let room_linked_chunk = state.room_linked_chunk();
1810
1811            // After the shrink, only evid3 and evid4 are loaded.
1812            let mut events = room_linked_chunk.events();
1813
1814            let (pos, ev) = events.next().unwrap();
1815            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1816            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1817
1818            let (pos, ev) = events.next().unwrap();
1819            assert_eq!(ev.event_id().as_deref(), Some(evid4));
1820            assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1821
1822            // No other loaded events.
1823            assert!(events.next().is_none());
1824
1825            // But we can still get the order of previous events.
1826            assert_eq!(
1827                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1828                Some(0)
1829            );
1830            assert_eq!(
1831                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1832                Some(1)
1833            );
1834
1835            // ev3 doesn't have an order with its previous position, since it's been
1836            // deduplicated.
1837            assert_eq!(
1838                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1839                None
1840            );
1841        }
1842    }
1843
1844    #[async_test]
1845    async fn test_auto_shrink_after_all_subscribers_are_gone() {
1846        let room_id = room_id!("!galette:saucisse.bzh");
1847
1848        let client = MockClientBuilder::new(None).build().await;
1849
1850        let f = EventFactory::new().room(room_id);
1851
1852        let evid1 = event_id!("$1");
1853        let evid2 = event_id!("$2");
1854
1855        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1856        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1857
1858        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1859        {
1860            client
1861                .event_cache_store()
1862                .lock()
1863                .await
1864                .expect("Could not acquire the event cache lock")
1865                .as_clean()
1866                .expect("Could not acquire a clean event cache lock")
1867                .handle_linked_chunk_updates(
1868                    LinkedChunkId::Room(room_id),
1869                    vec![
1870                        Update::NewItemsChunk {
1871                            previous: None,
1872                            new: ChunkIdentifier::new(0),
1873                            next: None,
1874                        },
1875                        Update::PushItems {
1876                            at: Position::new(ChunkIdentifier::new(0), 0),
1877                            items: vec![ev1],
1878                        },
1879                        Update::NewItemsChunk {
1880                            previous: Some(ChunkIdentifier::new(0)),
1881                            new: ChunkIdentifier::new(1),
1882                            next: None,
1883                        },
1884                        Update::PushItems {
1885                            at: Position::new(ChunkIdentifier::new(1), 0),
1886                            items: vec![ev2],
1887                        },
1888                    ],
1889                )
1890                .await
1891                .unwrap();
1892        }
1893
1894        let event_cache = client.event_cache();
1895        event_cache.subscribe().unwrap();
1896
1897        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1898        let room = client.get_room(room_id).unwrap();
1899        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1900
1901        // Sanity check: lazily loaded, so only includes one item at start.
1902        let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1903        assert_eq!(events1.len(), 1);
1904        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1905        assert!(stream1.is_empty());
1906
1907        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1908
1909        // Force loading the full linked chunk by back-paginating.
1910        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1911        assert_eq!(outcome.events.len(), 1);
1912        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1913        assert!(outcome.reached_start);
1914
1915        // We also get an update about the loading from the store. Ignore it, for this
1916        // test's sake.
1917        assert_let_timeout!(
1918            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1919                stream1.recv()
1920        );
1921        assert_eq!(diffs.len(), 1);
1922        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1923            assert_eq!(value.event_id().as_deref(), Some(evid1));
1924        });
1925
1926        assert!(stream1.is_empty());
1927
1928        assert_let_timeout!(
1929            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1930        );
1931        assert_eq!(expected_room_id, room_id);
1932        assert!(generic_stream.is_empty());
1933
1934        // Have another subscriber.
1935        // Since it's not the first one, and the previous one loaded some more events,
1936        // the second subscribers sees them all.
1937        let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1938        assert_eq!(events2.len(), 2);
1939        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1940        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1941        assert!(stream2.is_empty());
1942
1943        // Drop the first stream, and wait a bit.
1944        drop(stream1);
1945        yield_now().await;
1946
1947        // The second stream remains undisturbed.
1948        assert!(stream2.is_empty());
1949
1950        // Now drop the second stream, and wait a bit.
1951        drop(stream2);
1952        yield_now().await;
1953
1954        // The linked chunk must have auto-shrunk by now.
1955
1956        {
1957            // Check the inner state: there's no more shared auto-shrinker.
1958            let state = room_event_cache.inner.state.read().await.unwrap();
1959            assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
1960        }
1961
1962        // Getting the events will only give us the latest chunk.
1963        let events3 = room_event_cache.events().await.unwrap();
1964        assert_eq!(events3.len(), 1);
1965        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
1966    }
1967
1968    #[async_test]
1969    async fn test_rfind_map_event_in_memory_by() {
1970        let user_id = user_id!("@mnt_io:matrix.org");
1971        let room_id = room_id!("!raclette:patate.ch");
1972        let client = MockClientBuilder::new(None).build().await;
1973
1974        let event_factory = EventFactory::new().room(room_id);
1975
1976        let event_id_0 = event_id!("$ev0");
1977        let event_id_1 = event_id!("$ev1");
1978        let event_id_2 = event_id!("$ev2");
1979        let event_id_3 = event_id!("$ev3");
1980
1981        let event_0 =
1982            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
1983        let event_1 =
1984            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
1985        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
1986        let event_3 =
1987            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
1988
1989        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
1990        // events.
1991        {
1992            client
1993                .event_cache_store()
1994                .lock()
1995                .await
1996                .expect("Could not acquire the event cache lock")
1997                .as_clean()
1998                .expect("Could not acquire a clean event cache lock")
1999                .handle_linked_chunk_updates(
2000                    LinkedChunkId::Room(room_id),
2001                    vec![
2002                        Update::NewItemsChunk {
2003                            previous: None,
2004                            new: ChunkIdentifier::new(0),
2005                            next: None,
2006                        },
2007                        Update::PushItems {
2008                            at: Position::new(ChunkIdentifier::new(0), 0),
2009                            items: vec![event_3],
2010                        },
2011                        Update::NewItemsChunk {
2012                            previous: Some(ChunkIdentifier::new(0)),
2013                            new: ChunkIdentifier::new(1),
2014                            next: None,
2015                        },
2016                        Update::PushItems {
2017                            at: Position::new(ChunkIdentifier::new(1), 0),
2018                            items: vec![event_0, event_1, event_2],
2019                        },
2020                    ],
2021                )
2022                .await
2023                .unwrap();
2024        }
2025
2026        let event_cache = client.event_cache();
2027        event_cache.subscribe().unwrap();
2028
2029        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2030        let room = client.get_room(room_id).unwrap();
2031        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2032
2033        // Look for an event from `BOB`: it must be `event_0`.
2034        assert_matches!(
2035            room_event_cache
2036                .rfind_map_event_in_memory_by(|event| {
2037                    (event.sender().as_deref() == Some(*BOB)).then(|| event.event_id())
2038                })
2039                .await,
2040            Ok(Some(event_id)) => {
2041                assert_eq!(event_id.as_deref(), Some(event_id_0));
2042            }
2043        );
2044
2045        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
2046        // because events are looked for in reverse order.
2047        assert_matches!(
2048            room_event_cache
2049                .rfind_map_event_in_memory_by(|event| {
2050                    (event.sender().as_deref() == Some(*ALICE)).then(|| event.event_id())
2051                })
2052                .await,
2053            Ok(Some(event_id)) => {
2054                assert_eq!(event_id.as_deref(), Some(event_id_2));
2055            }
2056        );
2057
2058        // Look for an event that is inside the storage, but not loaded.
2059        assert!(
2060            room_event_cache
2061                .rfind_map_event_in_memory_by(|event| {
2062                    (event.sender().as_deref() == Some(user_id)).then(|| event.event_id())
2063                })
2064                .await
2065                .unwrap()
2066                .is_none()
2067        );
2068
2069        // Look for an event that doesn't exist.
2070        assert!(
2071            room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2072        );
2073    }
2074
2075    #[async_test]
2076    async fn test_reload_when_dirty() {
2077        let user_id = user_id!("@mnt_io:matrix.org");
2078        let room_id = room_id!("!raclette:patate.ch");
2079
2080        // The storage shared by the two clients.
2081        let event_cache_store = MemoryStore::new();
2082
2083        // Client for the process 0.
2084        let client_p0 = MockClientBuilder::new(None)
2085            .on_builder(|builder| {
2086                builder.store_config(
2087                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2088                        .event_cache_store(event_cache_store.clone()),
2089                )
2090            })
2091            .build()
2092            .await;
2093
2094        // Client for the process 1.
2095        let client_p1 = MockClientBuilder::new(None)
2096            .on_builder(|builder| {
2097                builder.store_config(
2098                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2099                        .event_cache_store(event_cache_store),
2100                )
2101            })
2102            .build()
2103            .await;
2104
2105        let event_factory = EventFactory::new().room(room_id).sender(user_id);
2106
2107        let ev_id_0 = event_id!("$ev_0");
2108        let ev_id_1 = event_id!("$ev_1");
2109
2110        let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2111        let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2112
2113        // Add events to the storage (shared by the two clients!).
2114        client_p0
2115            .event_cache_store()
2116            .lock()
2117            .await
2118            .expect("[p0] Could not acquire the event cache lock")
2119            .as_clean()
2120            .expect("[p0] Could not acquire a clean event cache lock")
2121            .handle_linked_chunk_updates(
2122                LinkedChunkId::Room(room_id),
2123                vec![
2124                    Update::NewItemsChunk {
2125                        previous: None,
2126                        new: ChunkIdentifier::new(0),
2127                        next: None,
2128                    },
2129                    Update::PushItems {
2130                        at: Position::new(ChunkIdentifier::new(0), 0),
2131                        items: vec![ev_0],
2132                    },
2133                    Update::NewItemsChunk {
2134                        previous: Some(ChunkIdentifier::new(0)),
2135                        new: ChunkIdentifier::new(1),
2136                        next: None,
2137                    },
2138                    Update::PushItems {
2139                        at: Position::new(ChunkIdentifier::new(1), 0),
2140                        items: vec![ev_1],
2141                    },
2142                ],
2143            )
2144            .await
2145            .unwrap();
2146
2147        // Subscribe the event caches, and create the room.
2148        let (room_event_cache_p0, room_event_cache_p1) = {
2149            let event_cache_p0 = client_p0.event_cache();
2150            event_cache_p0.subscribe().unwrap();
2151
2152            let event_cache_p1 = client_p1.event_cache();
2153            event_cache_p1.subscribe().unwrap();
2154
2155            client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
2156            client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
2157
2158            let (room_event_cache_p0, _drop_handles) =
2159                client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2160            let (room_event_cache_p1, _drop_handles) =
2161                client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2162
2163            (room_event_cache_p0, room_event_cache_p1)
2164        };
2165
2166        // Okay. We are ready for the test!
2167        //
2168        // First off, let's check `room_event_cache_p0` has access to the first event
2169        // loaded in-memory, then do a pagination, and see more events.
2170        let mut updates_stream_p0 = {
2171            let room_event_cache = &room_event_cache_p0;
2172
2173            let (initial_updates, mut updates_stream) =
2174                room_event_cache_p0.subscribe().await.unwrap();
2175
2176            // Initial updates contain `ev_id_1` only.
2177            assert_eq!(initial_updates.len(), 1);
2178            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2179            assert!(updates_stream.is_empty());
2180
2181            // `ev_id_1` must be loaded in memory.
2182            assert!(event_loaded(room_event_cache, ev_id_1).await);
2183
2184            // `ev_id_0` must NOT be loaded in memory.
2185            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2186
2187            // Load one more event with a backpagination.
2188            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2189
2190            // A new update for `ev_id_0` must be present.
2191            assert_matches!(
2192                updates_stream.recv().await.unwrap(),
2193                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2194                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2195                    assert_matches!(
2196                        &diffs[0],
2197                        VectorDiff::Insert { index: 0, value: event } => {
2198                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2199                        }
2200                    );
2201                }
2202            );
2203
2204            // `ev_id_0` must now be loaded in memory.
2205            assert!(event_loaded(room_event_cache, ev_id_0).await);
2206
2207            updates_stream
2208        };
2209
2210        // Second, let's check `room_event_cache_p1` has the same accesses.
2211        let mut updates_stream_p1 = {
2212            let room_event_cache = &room_event_cache_p1;
2213            let (initial_updates, mut updates_stream) =
2214                room_event_cache_p1.subscribe().await.unwrap();
2215
2216            // Initial updates contain `ev_id_1` only.
2217            assert_eq!(initial_updates.len(), 1);
2218            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2219            assert!(updates_stream.is_empty());
2220
2221            // `ev_id_1` must be loaded in memory.
2222            assert!(event_loaded(room_event_cache, ev_id_1).await);
2223
2224            // `ev_id_0` must NOT be loaded in memory.
2225            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2226
2227            // Load one more event with a backpagination.
2228            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2229
2230            // A new update for `ev_id_0` must be present.
2231            assert_matches!(
2232                updates_stream.recv().await.unwrap(),
2233                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2234                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2235                    assert_matches!(
2236                        &diffs[0],
2237                        VectorDiff::Insert { index: 0, value: event } => {
2238                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2239                        }
2240                    );
2241                }
2242            );
2243
2244            // `ev_id_0` must now be loaded in memory.
2245            assert!(event_loaded(room_event_cache, ev_id_0).await);
2246
2247            updates_stream
2248        };
2249
2250        // Do this a couple times, for the fun.
2251        for _ in 0..3 {
2252            // Third, because `room_event_cache_p1` has locked the store, the lock
2253            // is dirty for `room_event_cache_p0`, so it will shrink to its last
2254            // chunk!
2255            {
2256                let room_event_cache = &room_event_cache_p0;
2257                let updates_stream = &mut updates_stream_p0;
2258
2259                // `ev_id_1` must be loaded in memory, just like before.
2260                assert!(event_loaded(room_event_cache, ev_id_1).await);
2261
2262                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2263                // state has been reloaded to its last chunk.
2264                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2265
2266                // The reload can be observed via the updates too.
2267                assert_matches!(
2268                    updates_stream.recv().await.unwrap(),
2269                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2270                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2271                        assert_matches!(&diffs[0], VectorDiff::Clear);
2272                        assert_matches!(
2273                            &diffs[1],
2274                            VectorDiff::Append { values: events } => {
2275                                assert_eq!(events.len(), 1);
2276                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2277                            }
2278                        );
2279                    }
2280                );
2281
2282                // Load one more event with a backpagination.
2283                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2284
2285                // `ev_id_0` must now be loaded in memory.
2286                assert!(event_loaded(room_event_cache, ev_id_0).await);
2287
2288                // The pagination can be observed via the updates too.
2289                assert_matches!(
2290                    updates_stream.recv().await.unwrap(),
2291                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2292                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2293                        assert_matches!(
2294                            &diffs[0],
2295                            VectorDiff::Insert { index: 0, value: event } => {
2296                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2297                            }
2298                        );
2299                    }
2300                );
2301            }
2302
2303            // Fourth, because `room_event_cache_p0` has locked the store again, the lock
2304            // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
2305            // chunk!
2306            {
2307                let room_event_cache = &room_event_cache_p1;
2308                let updates_stream = &mut updates_stream_p1;
2309
2310                // `ev_id_1` must be loaded in memory, just like before.
2311                assert!(event_loaded(room_event_cache, ev_id_1).await);
2312
2313                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2314                // state has shrunk to its last chunk.
2315                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2316
2317                // The reload can be observed via the updates too.
2318                assert_matches!(
2319                    updates_stream.recv().await.unwrap(),
2320                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2321                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2322                        assert_matches!(&diffs[0], VectorDiff::Clear);
2323                        assert_matches!(
2324                            &diffs[1],
2325                            VectorDiff::Append { values: events } => {
2326                                assert_eq!(events.len(), 1);
2327                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2328                            }
2329                        );
2330                    }
2331                );
2332
2333                // Load one more event with a backpagination.
2334                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2335
2336                // `ev_id_0` must now be loaded in memory.
2337                assert!(event_loaded(room_event_cache, ev_id_0).await);
2338
2339                // The pagination can be observed via the updates too.
2340                assert_matches!(
2341                    updates_stream.recv().await.unwrap(),
2342                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2343                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2344                        assert_matches!(
2345                            &diffs[0],
2346                            VectorDiff::Insert { index: 0, value: event } => {
2347                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2348                            }
2349                        );
2350                    }
2351                );
2352            }
2353        }
2354
2355        // Repeat that with an explicit read lock (so that we don't rely on
2356        // `event_loaded` to trigger the dirty detection).
2357        for _ in 0..3 {
2358            {
2359                let room_event_cache = &room_event_cache_p0;
2360                let updates_stream = &mut updates_stream_p0;
2361
2362                let guard = room_event_cache.inner.state.read().await.unwrap();
2363
2364                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2365                // shared access.
2366                // See `RoomEventCacheStateLock::read` to learn more.
2367
2368                // The lock is no longer marked as dirty, it's been cleaned.
2369                assert!(guard.is_dirty().not());
2370
2371                // The reload can be observed via the updates too.
2372                assert_matches!(
2373                    updates_stream.recv().await.unwrap(),
2374                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2375                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2376                        assert_matches!(&diffs[0], VectorDiff::Clear);
2377                        assert_matches!(
2378                            &diffs[1],
2379                            VectorDiff::Append { values: events } => {
2380                                assert_eq!(events.len(), 1);
2381                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2382                            }
2383                        );
2384                    }
2385                );
2386
2387                assert!(event_loaded(room_event_cache, ev_id_1).await);
2388                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2389
2390                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2391                // want to make this super explicit).
2392                //
2393                // We drop need to drop it before the pagination because the pagination needs to
2394                // obtain a write lock.
2395                drop(guard);
2396
2397                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2398                assert!(event_loaded(room_event_cache, ev_id_0).await);
2399
2400                // The pagination can be observed via the updates too.
2401                assert_matches!(
2402                    updates_stream.recv().await.unwrap(),
2403                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2404                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2405                        assert_matches!(
2406                            &diffs[0],
2407                            VectorDiff::Insert { index: 0, value: event } => {
2408                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2409                            }
2410                        );
2411                    }
2412                );
2413            }
2414
2415            {
2416                let room_event_cache = &room_event_cache_p1;
2417                let updates_stream = &mut updates_stream_p1;
2418
2419                let guard = room_event_cache.inner.state.read().await.unwrap();
2420
2421                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2422                // shared access.
2423
2424                // The lock is no longer marked as dirty, it's been cleaned.
2425                assert!(guard.is_dirty().not());
2426
2427                // The reload can be observed via the updates too.
2428                assert_matches!(
2429                    updates_stream.recv().await.unwrap(),
2430                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2431                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2432                        assert_matches!(&diffs[0], VectorDiff::Clear);
2433                        assert_matches!(
2434                            &diffs[1],
2435                            VectorDiff::Append { values: events } => {
2436                                assert_eq!(events.len(), 1);
2437                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2438                            }
2439                        );
2440                    }
2441                );
2442
2443                assert!(event_loaded(room_event_cache, ev_id_1).await);
2444                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2445
2446                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2447                // want to make this super explicit).
2448                //
2449                // We drop need to drop it before the pagination because the pagination needs to
2450                // obtain a write lock.
2451                drop(guard);
2452
2453                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2454                assert!(event_loaded(room_event_cache, ev_id_0).await);
2455
2456                // The pagination can be observed via the updates too.
2457                assert_matches!(
2458                    updates_stream.recv().await.unwrap(),
2459                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2460                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2461                        assert_matches!(
2462                            &diffs[0],
2463                            VectorDiff::Insert { index: 0, value: event } => {
2464                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2465                            }
2466                        );
2467                    }
2468                );
2469            }
2470        }
2471
2472        // Repeat that with an explicit write lock.
2473        for _ in 0..3 {
2474            {
2475                let room_event_cache = &room_event_cache_p0;
2476                let updates_stream = &mut updates_stream_p0;
2477
2478                let guard = room_event_cache.inner.state.write().await.unwrap();
2479
2480                // The lock is no longer marked as dirty, it's been cleaned.
2481                assert!(guard.is_dirty().not());
2482
2483                // The reload can be observed via the updates too.
2484                assert_matches!(
2485                    updates_stream.recv().await.unwrap(),
2486                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2487                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2488                        assert_matches!(&diffs[0], VectorDiff::Clear);
2489                        assert_matches!(
2490                            &diffs[1],
2491                            VectorDiff::Append { values: events } => {
2492                                assert_eq!(events.len(), 1);
2493                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2494                            }
2495                        );
2496                    }
2497                );
2498
2499                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2500                // needs to obtain a read lock.
2501                drop(guard);
2502
2503                assert!(event_loaded(room_event_cache, ev_id_1).await);
2504                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2505
2506                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2507                assert!(event_loaded(room_event_cache, ev_id_0).await);
2508
2509                // The pagination can be observed via the updates too.
2510                assert_matches!(
2511                    updates_stream.recv().await.unwrap(),
2512                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2513                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2514                        assert_matches!(
2515                            &diffs[0],
2516                            VectorDiff::Insert { index: 0, value: event } => {
2517                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2518                            }
2519                        );
2520                    }
2521                );
2522            }
2523
2524            {
2525                let room_event_cache = &room_event_cache_p1;
2526                let updates_stream = &mut updates_stream_p1;
2527
2528                let guard = room_event_cache.inner.state.write().await.unwrap();
2529
2530                // The lock is no longer marked as dirty, it's been cleaned.
2531                assert!(guard.is_dirty().not());
2532
2533                // The reload can be observed via the updates too.
2534                assert_matches!(
2535                    updates_stream.recv().await.unwrap(),
2536                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2537                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2538                        assert_matches!(&diffs[0], VectorDiff::Clear);
2539                        assert_matches!(
2540                            &diffs[1],
2541                            VectorDiff::Append { values: events } => {
2542                                assert_eq!(events.len(), 1);
2543                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2544                            }
2545                        );
2546                    }
2547                );
2548
2549                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2550                // needs to obtain a read lock.
2551                drop(guard);
2552
2553                assert!(event_loaded(room_event_cache, ev_id_1).await);
2554                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2555
2556                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2557                assert!(event_loaded(room_event_cache, ev_id_0).await);
2558
2559                // The pagination can be observed via the updates too.
2560                assert_matches!(
2561                    updates_stream.recv().await.unwrap(),
2562                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2563                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2564                        assert_matches!(
2565                            &diffs[0],
2566                            VectorDiff::Insert { index: 0, value: event } => {
2567                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2568                            }
2569                        );
2570                    }
2571                );
2572            }
2573        }
2574    }
2575
2576    #[async_test]
2577    async fn test_load_when_dirty() {
2578        let room_id_0 = room_id!("!raclette:patate.ch");
2579        let room_id_1 = room_id!("!morbiflette:patate.ch");
2580
2581        // The storage shared by the two clients.
2582        let event_cache_store = MemoryStore::new();
2583
2584        // Client for the process 0.
2585        let client_p0 = MockClientBuilder::new(None)
2586            .on_builder(|builder| {
2587                builder.store_config(
2588                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2589                        .event_cache_store(event_cache_store.clone()),
2590                )
2591            })
2592            .build()
2593            .await;
2594
2595        // Client for the process 1.
2596        let client_p1 = MockClientBuilder::new(None)
2597            .on_builder(|builder| {
2598                builder.store_config(
2599                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2600                        .event_cache_store(event_cache_store),
2601                )
2602            })
2603            .build()
2604            .await;
2605
2606        // Subscribe the event caches, and create the room.
2607        let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2608            let event_cache_p0 = client_p0.event_cache();
2609            event_cache_p0.subscribe().unwrap();
2610
2611            let event_cache_p1 = client_p1.event_cache();
2612            event_cache_p1.subscribe().unwrap();
2613
2614            client_p0.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2615            client_p0.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2616
2617            client_p1.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2618            client_p1.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2619
2620            let (room_event_cache_0_p0, _drop_handles) =
2621                client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2622            let (room_event_cache_0_p1, _drop_handles) =
2623                client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2624
2625            (room_event_cache_0_p0, room_event_cache_0_p1)
2626        };
2627
2628        // Let's make the cross-process lock over the store dirty.
2629        {
2630            drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2631            drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2632        }
2633
2634        // Create the `RoomEventCache` for `room_id_1`. During its creation, the
2635        // cross-process lock over the store MUST be dirty, which makes no difference as
2636        // a clean one: the state is just loaded, not reloaded.
2637        let (room_event_cache_1_p0, _) =
2638            client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2639
2640        // Check the lock isn't dirty because it's been cleared.
2641        {
2642            let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2643            assert!(guard.is_dirty().not());
2644        }
2645
2646        // The only way to test this behaviour is to see that the dirty block in
2647        // `RoomEventCacheStateLock` is covered by this test.
2648    }
2649
2650    #[async_test]
2651    async fn test_uniq_read_marker() {
2652        let client = MockClientBuilder::new(None).build().await;
2653        let room_id = room_id!("!galette:saucisse.bzh");
2654        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2655
2656        let event_cache = client.event_cache();
2657
2658        event_cache.subscribe().unwrap();
2659
2660        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2661        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
2662        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
2663
2664        assert!(events.is_empty());
2665
2666        // When sending multiple times the same read marker event,…
2667        let read_marker_event = Raw::from_json_string(
2668            json!({
2669                "content": {
2670                    "event_id": "$crepe:saucisse.bzh"
2671                },
2672                "room_id": "!galette:saucisse.bzh",
2673                "type": "m.fully_read"
2674            })
2675            .to_string(),
2676        )
2677        .unwrap();
2678        let account_data = vec![read_marker_event; 100];
2679
2680        room_event_cache
2681            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
2682            .await
2683            .unwrap();
2684
2685        // … there's only one read marker update.
2686        assert_matches!(
2687            stream.recv().await.unwrap(),
2688            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
2689        );
2690
2691        assert!(stream.recv().now_or_never().is_none());
2692
2693        // None, because an account data doesn't trigger a generic update.
2694        assert!(generic_stream.recv().now_or_never().is_none());
2695    }
2696
2697    async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2698        room_event_cache
2699            .rfind_map_event_in_memory_by(|event| {
2700                (event.event_id().as_deref() == Some(event_id)).then_some(())
2701            })
2702            .await
2703            .unwrap()
2704            .is_some()
2705    }
2706}