Skip to main content

matrix_sdk/event_cache/caches/room/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21    collections::BTreeMap,
22    fmt,
23    sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28    deserialized_responses::AmbiguityChange,
29    event_cache::Event,
30    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
31};
32use ruma::{
33    EventId, OwnedEventId, OwnedRoomId, RoomId,
34    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
35    serde::Raw,
36};
37pub(super) use state::{LockedRoomEventCacheState, RoomEventCacheStateLockWriteGuard};
38pub use subscriber::RoomEventCacheSubscriber;
39use tokio::sync::{Notify, broadcast::Receiver, mpsc};
40use tracing::{instrument, trace, warn};
41pub use updates::{
42    RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
43    RoomEventCacheUpdateSender,
44};
45
46use super::{
47    super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination},
48    TimelineVectorDiffs,
49    event_linked_chunk::sort_positions_descending,
50    thread::pagination::ThreadPagination,
51};
52use crate::{
53    client::WeakClient,
54    event_cache::{
55        EventFocusThreadMode,
56        caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus},
57    },
58    room::WeakRoom,
59};
60
61/// A subset of an event cache, for a room.
62///
63/// Cloning is shallow, and thus is cheap to do.
64#[derive(Clone)]
65pub struct RoomEventCache {
66    inner: Arc<RoomEventCacheInner>,
67}
68
69impl fmt::Debug for RoomEventCache {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("RoomEventCache").finish_non_exhaustive()
72    }
73}
74
75impl RoomEventCache {
76    /// Create a new [`RoomEventCache`] using the given room and store.
77    pub(super) fn new(
78        room_id: OwnedRoomId,
79        weak_room: WeakRoom,
80        state: LockedRoomEventCacheState,
81        shared_pagination_status: SharedObservable<SharedPaginationStatus>,
82        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
83        update_sender: RoomEventCacheUpdateSender,
84    ) -> Self {
85        Self {
86            inner: Arc::new(RoomEventCacheInner::new(
87                room_id,
88                weak_room,
89                state,
90                shared_pagination_status,
91                auto_shrink_sender,
92                update_sender,
93            )),
94        }
95    }
96
97    /// Get the room ID for this [`RoomEventCache`].
98    pub fn room_id(&self) -> &RoomId {
99        &self.inner.room_id
100    }
101
102    /// Read all current events.
103    ///
104    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
105    /// subscriber.
106    pub async fn events(&self) -> Result<Vec<Event>> {
107        let state = self.inner.state.read().await?;
108
109        Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
110    }
111
112    /// Subscribe to this room updates, after getting the initial list of
113    /// events.
114    ///
115    /// Use [`RoomEventCache::events`] to get all current events without the
116    /// subscriber. Creating, and especially dropping, a
117    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
118    pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
119        let state = self.inner.state.read().await?;
120        let events =
121            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
122
123        let subscriber_count = state.subscriber_count();
124        let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
125        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
126
127        let subscriber = RoomEventCacheSubscriber::new(
128            self.inner.update_sender.new_room_receiver(),
129            self.inner.room_id.clone(),
130            self.inner.auto_shrink_sender.clone(),
131            subscriber_count.clone(),
132        );
133
134        Ok((events, subscriber))
135    }
136
137    /// Subscribe to thread for a given root event, and get a (maybe empty)
138    /// initially known list of events for that thread.
139    pub async fn subscribe_to_thread(
140        &self,
141        thread_root: OwnedEventId,
142    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
143        let mut state = self.inner.state.write().await?;
144
145        state.subscribe_to_thread(thread_root).await
146    }
147
148    /// Subscribe to the pinned event cache for this room.
149    ///
150    /// This is a persisted view over the pinned events of a room.
151    ///
152    /// The pinned events will be initially reloaded from storage, and/or loaded
153    /// from a network request to fetch the latest pinned events and their
154    /// relations, to update it as needed. The list of pinned events will
155    /// also be kept up-to-date as new events are pinned, and new
156    /// related events show up from other sources.
157    pub async fn subscribe_to_pinned_events(
158        &self,
159    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
160        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
161        let state = self.inner.state.read().await?;
162
163        state.subscribe_to_pinned_events(room).await
164    }
165
166    /// Create or get an event-focused timeline cache for this room.
167    ///
168    /// This creates a timeline centered around a specific event (e.g., for
169    /// permalinks), in a given mode, supporting both forward and backward
170    /// pagination.
171    ///
172    /// If the focused event is part of a thread, the timeline will
173    /// automatically use thread-specific pagination.
174    ///
175    /// If the thread mode is defined to [`EventFocusThreadMode::ForceThread`],
176    /// the timeline will be focused on the thread root of the thread the
177    /// target event belongs to, or it will consider that the target event
178    /// itself is the thread root.
179    #[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))]
180    pub async fn get_or_create_event_focused_cache(
181        &self,
182        event_id: OwnedEventId,
183        num_context_events: u16,
184        thread_mode: EventFocusThreadMode,
185    ) -> Result<EventFocusedCache> {
186        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
187        let guard = self.inner.state.read().await?;
188
189        // Check if we already have a cache for this event.
190        if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
191            trace!("the cache was already created, returning it");
192            return Ok(cache);
193        }
194
195        // Create a new cache.
196        let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone();
197
198        // Make sure to drop the guard before calling `start_from` below, as it may need
199        // to lock the room event cache's state again, when memoizing events
200        // received from the network response.
201        drop(guard);
202
203        let room_id = room.room_id().to_owned();
204        let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone());
205
206        trace!("creating a fresh event-focused cache");
207        let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender);
208
209        // Initialize the cache from the server.
210        cache.start_from(room, num_context_events, thread_mode).await?;
211
212        let mut guard = self.inner.state.write().await?;
213
214        // Check again if we already have a cache for this event, just in case there was
215        // a race with another caller during initialization.
216        if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
217            trace!("another cache has been racily created, returning it");
218            return Ok(cache);
219        }
220
221        // Insert the cache in the map.
222        guard.insert_event_focused_cache(event_id, thread_mode, cache.clone());
223
224        Ok(cache)
225    }
226
227    /// Get an event-focused cache for this event and thread mode, if it exists.
228    ///
229    /// Otherwise, returns `None`.
230    ///
231    /// Use [`Self::get_or_create_event_focused_cache`] for ensuring such a
232    /// cache exists.
233    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234    pub async fn get_event_focused_cache(
235        &self,
236        event_id: OwnedEventId,
237        thread_mode: EventFocusThreadMode,
238    ) -> Result<Option<EventFocusedCache>> {
239        Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode))
240    }
241
242    /// Return a [`RoomPagination`] type useful for running back-pagination
243    /// queries in the current room.
244    pub fn pagination(&self) -> RoomPagination {
245        RoomPagination::new(self.inner.clone())
246    }
247
248    /// Return a [`ThreadPagination`] type useful for running back-pagination
249    /// queries in the `thread_id` thread.
250    pub async fn thread_pagination(&self, thread_id: OwnedEventId) -> Result<ThreadPagination> {
251        Ok(self.inner.state.write().await?.get_or_reload_thread(thread_id).pagination())
252    }
253
254    /// Try to find a single event in this room, starting from the most recent
255    /// event.
256    ///
257    /// The `predicate` receives the current event as its single argument.
258    ///
259    /// **Warning**! It looks into the loaded events from the in-memory linked
260    /// chunk **only**. It doesn't look inside the storage.
261    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
262    where
263        P: FnMut(&Event) -> Option<O>,
264    {
265        Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
266    }
267
268    /// Try to find an event by ID in this room.
269    ///
270    /// It starts by looking into loaded events before looking inside the
271    /// storage.
272    pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
273        Ok(self
274            .inner
275            .state
276            .read()
277            .await?
278            .find_event(event_id)
279            .await
280            .ok()
281            .flatten()
282            .map(|(_loc, event)| event))
283    }
284
285    /// Try to find an event by ID in this room, along with its related events.
286    ///
287    /// You can filter which types of related events to retrieve using
288    /// `filter`. `None` will retrieve related events of any type.
289    ///
290    /// The related events are sorted like this:
291    ///
292    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
293    ///   located at the beginning of the array.
294    /// - events present in the linked chunk (be it in memory or in the storage)
295    ///   will be sorted according to their ordering in the linked chunk.
296    pub async fn find_event_with_relations(
297        &self,
298        event_id: &EventId,
299        filter: Option<Vec<RelationType>>,
300    ) -> Result<Option<(Event, Vec<Event>)>> {
301        // Search in all loaded or stored events.
302        Ok(self
303            .inner
304            .state
305            .read()
306            .await?
307            .find_event_with_relations(event_id, filter.clone())
308            .await
309            .ok()
310            .flatten())
311    }
312
313    /// Try to find the related events for an event by ID in this room.
314    ///
315    /// You can filter which types of related events to retrieve using
316    /// `filter`. `None` will retrieve related events of any type.
317    ///
318    /// The related events are sorted like this:
319    ///
320    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
321    ///   located at the beginning of the array.
322    /// - events present in the linked chunk (be it in memory or in the storage)
323    ///   will be sorted according to their ordering in the linked chunk.
324    pub async fn find_event_relations(
325        &self,
326        event_id: &EventId,
327        filter: Option<Vec<RelationType>>,
328    ) -> Result<Vec<Event>> {
329        // Search in all loaded or stored events.
330        self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
331    }
332
333    /// Clear all the storage for this [`RoomEventCache`].
334    ///
335    /// This will get rid of all the events from the linked chunk and persisted
336    /// storage.
337    pub async fn clear(&self) -> Result<()> {
338        // Clear the linked chunk and persisted storage.
339        let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
340
341        // Notify observers about the update.
342        self.inner.update_sender.send(
343            RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
344                diffs: updates_as_vector_diffs,
345                origin: EventsOrigin::Cache,
346            }),
347            Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
348        );
349
350        Ok(())
351    }
352
353    /// Return a reference to the state.
354    pub(in super::super) fn state(&self) -> &LockedRoomEventCacheState {
355        &self.inner.state
356    }
357
358    /// Handle a [`JoinedRoomUpdate`].
359    #[instrument(skip_all, fields(room_id = %self.room_id()))]
360    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
361        self.inner
362            .handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes)
363            .await?;
364        self.inner.handle_account_data(updates.account_data);
365
366        Ok(())
367    }
368
369    /// Handle a [`LeftRoomUpdate`].
370    #[instrument(skip_all, fields(room_id = %self.room_id()))]
371    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
372        self.inner.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
373
374        Ok(())
375    }
376
377    /// Get a reference to the [`RoomEventCacheUpdateSender`].
378    pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
379        &self.inner.update_sender
380    }
381
382    /// Handle a single event from the `SendQueue`.
383    pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
384        self.inner
385            .handle_timeline(
386                Timeline { limited: false, prev_batch: None, events: vec![event] },
387                Vec::new(),
388                BTreeMap::new(),
389            )
390            .await
391    }
392
393    /// Save some events in the event cache, for further retrieval with
394    /// [`Self::event`].
395    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
396        match self.inner.state.write().await {
397            Ok(mut state_guard) => {
398                if let Err(err) = state_guard.save_events(events).await {
399                    warn!("couldn't save event in the event cache: {err}");
400                }
401            }
402
403            Err(err) => {
404                warn!("couldn't save event in the event cache: {err}");
405            }
406        }
407    }
408
409    /// Return a nice debug string (a vector of lines) for the linked chunk of
410    /// events for this room.
411    pub async fn debug_string(&self) -> Vec<String> {
412        match self.inner.state.read().await {
413            Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
414            Err(err) => {
415                warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
416
417                vec![]
418            }
419        }
420    }
421}
422
423/// The (non-cloneable) details of the `RoomEventCache`.
424pub(super) struct RoomEventCacheInner {
425    /// The room id for this room.
426    room_id: OwnedRoomId,
427
428    pub weak_room: WeakRoom,
429
430    /// State for this room's event cache.
431    pub state: LockedRoomEventCacheState,
432
433    /// A notifier that we received a new pagination token.
434    pub pagination_batch_token_notifier: Notify,
435
436    pub shared_pagination_status: SharedObservable<SharedPaginationStatus>,
437
438    /// Sender to the auto-shrink channel.
439    ///
440    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
441    /// more details.
442    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
443
444    /// Update sender for this room.
445    update_sender: RoomEventCacheUpdateSender,
446}
447
448impl RoomEventCacheInner {
449    /// Creates a new cache for a room, and subscribes to room updates, so as
450    /// to handle new timeline events.
451    fn new(
452        room_id: OwnedRoomId,
453        weak_room: WeakRoom,
454        state: LockedRoomEventCacheState,
455        shared_pagination_status: SharedObservable<SharedPaginationStatus>,
456        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
457        update_sender: RoomEventCacheUpdateSender,
458    ) -> Self {
459        Self {
460            room_id,
461            weak_room,
462            state,
463            update_sender,
464            pagination_batch_token_notifier: Default::default(),
465            auto_shrink_sender,
466            shared_pagination_status,
467        }
468    }
469
470    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
471        if account_data.is_empty() {
472            return;
473        }
474
475        let mut handled_read_marker = false;
476
477        trace!("Handling account data");
478
479        for raw_event in account_data {
480            match raw_event.deserialize() {
481                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
482                    // If duplicated, do not forward read marker multiple times
483                    // to avoid clutter the update channel.
484                    if handled_read_marker {
485                        continue;
486                    }
487
488                    handled_read_marker = true;
489
490                    // Propagate to observers. (We ignore the error if there aren't any.)
491                    self.update_sender.send(
492                        RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
493                        None,
494                    );
495                }
496
497                Ok(_) => {
498                    // We're not interested in other room account data updates,
499                    // at this point.
500                }
501
502                Err(e) => {
503                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
504                    warn!(event_type, "Failed to deserialize account data: {e}");
505                }
506            }
507        }
508    }
509
510    /// Handle a [`Timeline`], i.e. new events received by a sync for this
511    /// room.
512    async fn handle_timeline(
513        &self,
514        timeline: Timeline,
515        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
516        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
517    ) -> Result<()> {
518        if timeline.events.is_empty()
519            && timeline.prev_batch.is_none()
520            && ephemeral_events.is_empty()
521            && ambiguity_changes.is_empty()
522        {
523            return Ok(());
524        }
525
526        // Add all the events to the backend.
527        trace!("adding new events");
528
529        let (stored_prev_batch_token, timeline_event_diffs) =
530            self.state.write().await?.handle_sync(timeline, &ephemeral_events).await?;
531
532        // Now that all events have been added, we can trigger the
533        // `pagination_token_notifier`.
534        if stored_prev_batch_token {
535            self.pagination_batch_token_notifier.notify_one();
536        }
537
538        // The order matters here: first send the timeline event diffs, then only the
539        // related events (read receipts, etc.).
540        if !timeline_event_diffs.is_empty() {
541            self.update_sender.send(
542                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
543                    diffs: timeline_event_diffs,
544                    origin: EventsOrigin::Sync,
545                }),
546                Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
547            );
548        }
549
550        if !ephemeral_events.is_empty() {
551            self.update_sender
552                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
553        }
554
555        if !ambiguity_changes.is_empty() {
556            self.update_sender
557                .send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }, None);
558        }
559
560        Ok(())
561    }
562}
563
564#[derive(Clone, Copy)]
565pub(in super::super) enum PostProcessingOrigin {
566    Sync,
567    Backpagination,
568    #[cfg(feature = "e2e-encryption")]
569    Redecryption,
570}
571
572#[cfg(test)]
573mod tests {
574    use matrix_sdk_base::{RoomState, event_cache::Event};
575    use matrix_sdk_test::{async_test, event_factory::EventFactory};
576    use ruma::{
577        RoomId, event_id,
578        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
579        room_id, user_id,
580    };
581
582    use crate::test_utils::logged_in_client;
583
584    #[async_test]
585    async fn test_find_event_by_id_with_edit_relation() {
586        let original_id = event_id!("$original");
587        let related_id = event_id!("$related");
588        let room_id = room_id!("!galette:saucisse.bzh");
589        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
590
591        assert_relations(
592            room_id,
593            f.text_msg("Original event").event_id(original_id).into(),
594            f.text_msg("* An edited event")
595                .edit(
596                    original_id,
597                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
598                )
599                .event_id(related_id)
600                .into(),
601            f,
602        )
603        .await;
604    }
605
606    #[async_test]
607    async fn test_find_event_by_id_with_thread_reply_relation() {
608        let original_id = event_id!("$original");
609        let related_id = event_id!("$related");
610        let room_id = room_id!("!galette:saucisse.bzh");
611        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
612
613        assert_relations(
614            room_id,
615            f.text_msg("Original event").event_id(original_id).into(),
616            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
617            f,
618        )
619        .await;
620    }
621
622    #[async_test]
623    async fn test_find_event_by_id_with_reaction_relation() {
624        let original_id = event_id!("$original");
625        let related_id = event_id!("$related");
626        let room_id = room_id!("!galette:saucisse.bzh");
627        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
628
629        assert_relations(
630            room_id,
631            f.text_msg("Original event").event_id(original_id).into(),
632            f.reaction(original_id, ":D").event_id(related_id).into(),
633            f,
634        )
635        .await;
636    }
637
638    #[async_test]
639    async fn test_find_event_by_id_with_poll_response_relation() {
640        let original_id = event_id!("$original");
641        let related_id = event_id!("$related");
642        let room_id = room_id!("!galette:saucisse.bzh");
643        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
644
645        assert_relations(
646            room_id,
647            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
648                .event_id(original_id)
649                .into(),
650            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
651            f,
652        )
653        .await;
654    }
655
656    #[async_test]
657    async fn test_find_event_by_id_with_poll_end_relation() {
658        let original_id = event_id!("$original");
659        let related_id = event_id!("$related");
660        let room_id = room_id!("!galette:saucisse.bzh");
661        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
662
663        assert_relations(
664            room_id,
665            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
666                .event_id(original_id)
667                .into(),
668            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
669            f,
670        )
671        .await;
672    }
673
674    #[async_test]
675    async fn test_find_event_by_id_with_filtered_relationships() {
676        let original_id = event_id!("$original");
677        let related_id = event_id!("$related");
678        let associated_related_id = event_id!("$recursive_related");
679        let room_id = room_id!("!galette:saucisse.bzh");
680        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
681
682        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
683        let related_event = event_factory
684            .text_msg("* Edited event")
685            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
686            .event_id(related_id)
687            .into();
688        let associated_related_event =
689            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
690
691        let client = logged_in_client(None).await;
692
693        let event_cache = client.event_cache();
694        event_cache.subscribe().unwrap();
695
696        client.base_client().get_or_create_room(room_id, RoomState::Joined);
697        let room = client.get_room(room_id).unwrap();
698
699        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
700
701        // Save the original event.
702        room_event_cache.save_events([original_event]).await;
703
704        // Save the related event.
705        room_event_cache.save_events([related_event]).await;
706
707        // Save the associated related event, which redacts the related event.
708        room_event_cache.save_events([associated_related_event]).await;
709
710        let filter = Some(vec![RelationType::Replacement]);
711        let (event, related_events) = room_event_cache
712            .find_event_with_relations(original_id, filter)
713            .await
714            .expect("Failed to find the event with relations")
715            .expect("Event has no relation");
716        // Fetched event is the right one.
717        let cached_event_id = event.event_id().unwrap();
718        assert_eq!(cached_event_id, original_id);
719
720        // There's only the edit event (an edit event can't have its own edit event).
721        assert_eq!(related_events.len(), 1);
722
723        let related_event_id = related_events[0].event_id().unwrap();
724        assert_eq!(related_event_id, related_id);
725
726        // Now we'll filter threads instead, there should be no related events
727        let filter = Some(vec![RelationType::Thread]);
728        let (event, related_events) = room_event_cache
729            .find_event_with_relations(original_id, filter)
730            .await
731            .expect("Failed to find the event with relations")
732            .expect("Event has no relation");
733
734        // Fetched event is the right one.
735        let cached_event_id = event.event_id().unwrap();
736        assert_eq!(cached_event_id, original_id);
737        // No Thread related events found
738        assert!(related_events.is_empty());
739    }
740
741    #[async_test]
742    async fn test_find_event_by_id_with_recursive_relation() {
743        let original_id = event_id!("$original");
744        let related_id = event_id!("$related");
745        let associated_related_id = event_id!("$recursive_related");
746        let room_id = room_id!("!galette:saucisse.bzh");
747        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
748
749        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
750        let related_event = event_factory
751            .text_msg("* Edited event")
752            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
753            .event_id(related_id)
754            .into();
755        let associated_related_event =
756            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
757
758        let client = logged_in_client(None).await;
759
760        let event_cache = client.event_cache();
761        event_cache.subscribe().unwrap();
762
763        client.base_client().get_or_create_room(room_id, RoomState::Joined);
764        let room = client.get_room(room_id).unwrap();
765
766        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
767
768        // Save the original event.
769        room_event_cache.save_events([original_event]).await;
770
771        // Save the related event.
772        room_event_cache.save_events([related_event]).await;
773
774        // Save the associated related event, which redacts the related event.
775        room_event_cache.save_events([associated_related_event]).await;
776
777        let (event, related_events) = room_event_cache
778            .find_event_with_relations(original_id, None)
779            .await
780            .expect("Failed to find the event with relations")
781            .expect("Event has no relation");
782        // Fetched event is the right one.
783        let cached_event_id = event.event_id().unwrap();
784        assert_eq!(cached_event_id, original_id);
785
786        // There are both the related id and the associatively related id
787        assert_eq!(related_events.len(), 2);
788
789        let related_event_id = related_events[0].event_id().unwrap();
790        assert_eq!(related_event_id, related_id);
791        let related_event_id = related_events[1].event_id().unwrap();
792        assert_eq!(related_event_id, associated_related_id);
793    }
794
795    async fn assert_relations(
796        room_id: &RoomId,
797        original_event: Event,
798        related_event: Event,
799        event_factory: EventFactory,
800    ) {
801        let client = logged_in_client(None).await;
802
803        let event_cache = client.event_cache();
804        event_cache.subscribe().unwrap();
805
806        client.base_client().get_or_create_room(room_id, RoomState::Joined);
807        let room = client.get_room(room_id).unwrap();
808
809        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
810
811        // Save the original event.
812        let original_event_id = original_event.event_id().unwrap();
813        room_event_cache.save_events([original_event]).await;
814
815        // Save an unrelated event to check it's not in the related events list.
816        let unrelated_id = event_id!("$2");
817        room_event_cache
818            .save_events([event_factory
819                .text_msg("An unrelated event")
820                .event_id(unrelated_id)
821                .into()])
822            .await;
823
824        // Save the related event.
825        let related_id = related_event.event_id().unwrap();
826        room_event_cache.save_events([related_event]).await;
827
828        let (event, related_events) = room_event_cache
829            .find_event_with_relations(&original_event_id, None)
830            .await
831            .expect("Failed to find the event with relations")
832            .expect("Event has no relation");
833        // Fetched event is the right one.
834        let cached_event_id = event.event_id().unwrap();
835        assert_eq!(cached_event_id, original_event_id);
836
837        // There is only the actually related event in the related ones
838        let related_event_id = related_events[0].event_id().unwrap();
839        assert_eq!(related_event_id, related_id);
840    }
841}
842
843#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
844mod timed_tests {
845    use std::{ops::Not, sync::Arc};
846
847    use assert_matches::assert_matches;
848    use assert_matches2::assert_let;
849    use eyeball_im::VectorDiff;
850    use futures_util::FutureExt;
851    use matrix_sdk_base::{
852        RoomState,
853        event_cache::{
854            Gap,
855            store::{EventCacheStore as _, MemoryStore},
856        },
857        linked_chunk::{
858            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
859            lazy_loader::from_all_chunks,
860        },
861        store::StoreConfig,
862        sync::{JoinedRoomUpdate, Timeline},
863    };
864    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
865    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
866    use ruma::{
867        EventId, OwnedUserId, event_id,
868        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
869        room_id,
870        serde::Raw,
871        user_id,
872    };
873    use serde_json::json;
874    use tokio::task::yield_now;
875
876    use super::{
877        super::{
878            super::TimelineVectorDiffs, lock::Reload as _,
879            pagination::LoadMoreEventsBackwardsOutcome,
880        },
881        RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
882    };
883    use crate::{assert_let_timeout, test_utils::client::MockClientBuilder};
884
885    #[async_test]
886    async fn test_write_to_storage() {
887        let room_id = room_id!("!galette:saucisse.bzh");
888        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
889
890        let event_cache_store = Arc::new(MemoryStore::new());
891
892        let client = MockClientBuilder::new(None)
893            .on_builder(|builder| {
894                builder.store_config(
895                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
896                        .event_cache_store(event_cache_store.clone()),
897                )
898            })
899            .build()
900            .await;
901
902        let event_cache = client.event_cache();
903
904        // Don't forget to subscribe and like.
905        event_cache.subscribe().unwrap();
906
907        client.base_client().get_or_create_room(room_id, RoomState::Joined);
908        let room = client.get_room(room_id).unwrap();
909
910        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
911        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
912
913        // Propagate an update for a message and a prev-batch token.
914        let timeline = Timeline {
915            limited: true,
916            prev_batch: Some("raclette".to_owned()),
917            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
918        };
919
920        room_event_cache
921            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
922            .await
923            .unwrap();
924
925        // Just checking the generic update is correct.
926        assert_matches!(
927            generic_stream.recv().await,
928            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
929                assert_eq!(expected_room_id, room_id);
930            }
931        );
932        assert!(generic_stream.is_empty());
933
934        // Check the storage.
935        let linked_chunk = from_all_chunks::<3, _, _>(
936            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
937        )
938        .unwrap()
939        .unwrap();
940
941        assert_eq!(linked_chunk.chunks().count(), 2);
942
943        let mut chunks = linked_chunk.chunks();
944
945        // We start with the gap.
946        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
947            assert_eq!(gap.token, "raclette");
948        });
949
950        // Then we have the stored event.
951        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
952            assert_eq!(events.len(), 1);
953            let deserialized = events[0].raw().deserialize().unwrap();
954            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
955            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
956        });
957
958        // That's all, folks!
959        assert!(chunks.next().is_none());
960    }
961
962    #[async_test]
963    async fn test_write_to_storage_strips_bundled_relations() {
964        let room_id = room_id!("!galette:saucisse.bzh");
965        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
966
967        let event_cache_store = Arc::new(MemoryStore::new());
968
969        let client = MockClientBuilder::new(None)
970            .on_builder(|builder| {
971                builder.store_config(
972                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
973                        .event_cache_store(event_cache_store.clone()),
974                )
975            })
976            .build()
977            .await;
978
979        let event_cache = client.event_cache();
980
981        // Don't forget to subscribe and like.
982        event_cache.subscribe().unwrap();
983
984        client.base_client().get_or_create_room(room_id, RoomState::Joined);
985        let room = client.get_room(room_id).unwrap();
986
987        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
988        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
989
990        // Propagate an update for a message with bundled relations.
991        let ev = f
992            .text_msg("hey yo")
993            .sender(*ALICE)
994            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
995            .into_event();
996
997        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
998
999        room_event_cache
1000            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1001            .await
1002            .unwrap();
1003
1004        // Just checking the generic update is correct.
1005        assert_matches!(
1006            generic_stream.recv().await,
1007            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1008                assert_eq!(expected_room_id, room_id);
1009            }
1010        );
1011        assert!(generic_stream.is_empty());
1012
1013        // The in-memory linked chunk keeps the bundled relation.
1014        {
1015            let events = room_event_cache.events().await.unwrap();
1016
1017            assert_eq!(events.len(), 1);
1018
1019            let ev = events[0].raw().deserialize().unwrap();
1020            assert_let!(
1021                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1022            );
1023
1024            let original = msg.as_original().unwrap();
1025            assert_eq!(original.content.body(), "hey yo");
1026            assert!(original.unsigned.relations.replace.is_some());
1027        }
1028
1029        // The one in storage does not.
1030        let linked_chunk = from_all_chunks::<3, _, _>(
1031            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1032        )
1033        .unwrap()
1034        .unwrap();
1035
1036        assert_eq!(linked_chunk.chunks().count(), 1);
1037
1038        let mut chunks = linked_chunk.chunks();
1039        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1040            assert_eq!(events.len(), 1);
1041
1042            let ev = events[0].raw().deserialize().unwrap();
1043            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1044
1045            let original = msg.as_original().unwrap();
1046            assert_eq!(original.content.body(), "hey yo");
1047            assert!(original.unsigned.relations.replace.is_none());
1048        });
1049
1050        // That's all, folks!
1051        assert!(chunks.next().is_none());
1052    }
1053
1054    #[async_test]
1055    async fn test_clear() {
1056        let room_id = room_id!("!galette:saucisse.bzh");
1057        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1058
1059        let event_cache_store = Arc::new(MemoryStore::new());
1060
1061        let event_id1 = event_id!("$1");
1062        let event_id2 = event_id!("$2");
1063
1064        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1065        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1066
1067        // Prefill the store with some data.
1068        event_cache_store
1069            .handle_linked_chunk_updates(
1070                LinkedChunkId::Room(room_id),
1071                vec![
1072                    // An empty items chunk.
1073                    Update::NewItemsChunk {
1074                        previous: None,
1075                        new: ChunkIdentifier::new(0),
1076                        next: None,
1077                    },
1078                    // A gap chunk.
1079                    Update::NewGapChunk {
1080                        previous: Some(ChunkIdentifier::new(0)),
1081                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1082                        new: ChunkIdentifier::new(42),
1083                        next: None,
1084                        gap: Gap { token: "comté".to_owned() },
1085                    },
1086                    // Another items chunk, non-empty this time.
1087                    Update::NewItemsChunk {
1088                        previous: Some(ChunkIdentifier::new(42)),
1089                        new: ChunkIdentifier::new(1),
1090                        next: None,
1091                    },
1092                    Update::PushItems {
1093                        at: Position::new(ChunkIdentifier::new(1), 0),
1094                        items: vec![ev1.clone()],
1095                    },
1096                    // And another items chunk, non-empty again.
1097                    Update::NewItemsChunk {
1098                        previous: Some(ChunkIdentifier::new(1)),
1099                        new: ChunkIdentifier::new(2),
1100                        next: None,
1101                    },
1102                    Update::PushItems {
1103                        at: Position::new(ChunkIdentifier::new(2), 0),
1104                        items: vec![ev2.clone()],
1105                    },
1106                ],
1107            )
1108            .await
1109            .unwrap();
1110
1111        let client = MockClientBuilder::new(None)
1112            .on_builder(|builder| {
1113                builder.store_config(
1114                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1115                        .event_cache_store(event_cache_store.clone()),
1116                )
1117            })
1118            .build()
1119            .await;
1120
1121        let event_cache = client.event_cache();
1122
1123        // Don't forget to subscribe and like.
1124        event_cache.subscribe().unwrap();
1125
1126        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1127        let room = client.get_room(room_id).unwrap();
1128
1129        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1130
1131        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1132        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1133
1134        // The rooms knows about all cached events.
1135        {
1136            assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1137            assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1138        }
1139
1140        // But only part of events are loaded from the store
1141        {
1142            // The room must contain only one event because only one chunk has been loaded.
1143            assert_eq!(items.len(), 1);
1144            assert_eq!(items[0].event_id().unwrap(), event_id2);
1145
1146            assert!(stream.is_empty());
1147        }
1148
1149        // Let's load more chunks to load all events.
1150        {
1151            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1152
1153            assert_let_timeout!(
1154                Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1155                    stream.recv()
1156            );
1157            assert_eq!(diffs.len(), 1);
1158            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1159                // Here you are `event_id1`!
1160                assert_eq!(event.event_id().unwrap(), event_id1);
1161            });
1162
1163            assert!(stream.is_empty());
1164
1165            assert_let_timeout!(
1166                Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1167                    generic_stream.recv()
1168            );
1169            assert_eq!(room_id, expected_room_id);
1170            assert!(generic_stream.is_empty());
1171        }
1172
1173        // After clearing,…
1174        room_event_cache.clear().await.unwrap();
1175
1176        //… we get an update that the content has been cleared.
1177        assert_let_timeout!(
1178            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1179                stream.recv()
1180        );
1181        assert_eq!(diffs.len(), 1);
1182        assert_let!(VectorDiff::Clear = &diffs[0]);
1183
1184        // … same with a generic update.
1185        assert_let_timeout!(
1186            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1187        );
1188        assert_eq!(received_room_id, room_id);
1189        assert!(generic_stream.is_empty());
1190
1191        // Events individually are not forgotten by the event cache, after clearing a
1192        // room.
1193        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1194
1195        // But their presence in a linked chunk is forgotten.
1196        let items = room_event_cache.events().await.unwrap();
1197        assert!(items.is_empty());
1198
1199        // The event cache store too.
1200        let linked_chunk = from_all_chunks::<3, _, _>(
1201            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1202        )
1203        .unwrap()
1204        .unwrap();
1205
1206        // Note: while the event cache store could return `None` here, clearing it will
1207        // reset it to its initial form, maintaining the invariant that it
1208        // contains a single items chunk that's empty.
1209        assert_eq!(linked_chunk.num_items(), 0);
1210    }
1211
1212    #[async_test]
1213    async fn test_load_from_storage() {
1214        let room_id = room_id!("!galette:saucisse.bzh");
1215        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1216
1217        let event_cache_store = Arc::new(MemoryStore::new());
1218
1219        let event_id1 = event_id!("$1");
1220        let event_id2 = event_id!("$2");
1221
1222        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1223        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1224
1225        // Prefill the store with some data.
1226        event_cache_store
1227            .handle_linked_chunk_updates(
1228                LinkedChunkId::Room(room_id),
1229                vec![
1230                    // An empty items chunk.
1231                    Update::NewItemsChunk {
1232                        previous: None,
1233                        new: ChunkIdentifier::new(0),
1234                        next: None,
1235                    },
1236                    // A gap chunk.
1237                    Update::NewGapChunk {
1238                        previous: Some(ChunkIdentifier::new(0)),
1239                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1240                        new: ChunkIdentifier::new(42),
1241                        next: None,
1242                        gap: Gap { token: "cheddar".to_owned() },
1243                    },
1244                    // Another items chunk, non-empty this time.
1245                    Update::NewItemsChunk {
1246                        previous: Some(ChunkIdentifier::new(42)),
1247                        new: ChunkIdentifier::new(1),
1248                        next: None,
1249                    },
1250                    Update::PushItems {
1251                        at: Position::new(ChunkIdentifier::new(1), 0),
1252                        items: vec![ev1.clone()],
1253                    },
1254                    // And another items chunk, non-empty again.
1255                    Update::NewItemsChunk {
1256                        previous: Some(ChunkIdentifier::new(1)),
1257                        new: ChunkIdentifier::new(2),
1258                        next: None,
1259                    },
1260                    Update::PushItems {
1261                        at: Position::new(ChunkIdentifier::new(2), 0),
1262                        items: vec![ev2.clone()],
1263                    },
1264                ],
1265            )
1266            .await
1267            .unwrap();
1268
1269        let client = MockClientBuilder::new(None)
1270            .on_builder(|builder| {
1271                builder.store_config(
1272                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1273                        .event_cache_store(event_cache_store.clone()),
1274                )
1275            })
1276            .build()
1277            .await;
1278
1279        let event_cache = client.event_cache();
1280
1281        // Don't forget to subscribe and like.
1282        event_cache.subscribe().unwrap();
1283
1284        // Let's check whether the generic updates are received for the initialisation.
1285        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1286
1287        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1288        let room = client.get_room(room_id).unwrap();
1289
1290        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1291
1292        // The room event cache has been loaded. A generic update must have been
1293        // triggered.
1294        assert_matches!(
1295            generic_stream.recv().await,
1296            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1297                assert_eq!(room_id, expected_room_id);
1298            }
1299        );
1300        assert!(generic_stream.is_empty());
1301
1302        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1303
1304        // The initial items contain one event because only the last chunk is loaded by
1305        // default.
1306        assert_eq!(items.len(), 1);
1307        assert_eq!(items[0].event_id().unwrap(), event_id2);
1308        assert!(stream.is_empty());
1309
1310        // The event cache knows only all events though, even if they aren't loaded.
1311        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1312        assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1313
1314        // Let's paginate to load more events.
1315        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1316
1317        assert_let_timeout!(
1318            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1319                stream.recv()
1320        );
1321        assert_eq!(diffs.len(), 1);
1322        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1323            assert_eq!(event.event_id().unwrap(), event_id1);
1324        });
1325
1326        assert!(stream.is_empty());
1327
1328        // A generic update is triggered too.
1329        assert_matches!(
1330            generic_stream.recv().await,
1331            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1332                assert_eq!(expected_room_id, room_id);
1333            }
1334        );
1335        assert!(generic_stream.is_empty());
1336
1337        // A new update with one of these events leads to deduplication.
1338        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1339
1340        room_event_cache
1341            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1342            .await
1343            .unwrap();
1344
1345        // Just checking the generic update is correct. There is a duplicate event, so
1346        // no generic changes whatsoever!
1347        assert!(generic_stream.recv().now_or_never().is_none());
1348
1349        // The stream doesn't report these changes *yet*. Use the items vector given
1350        // when subscribing, to check that the items correspond to their new
1351        // positions. The duplicated item is removed (so it's not the first
1352        // element anymore), and it's added to the back of the list.
1353        let items = room_event_cache.events().await.unwrap();
1354        assert_eq!(items.len(), 2);
1355        assert_eq!(items[0].event_id().unwrap(), event_id1);
1356        assert_eq!(items[1].event_id().unwrap(), event_id2);
1357    }
1358
1359    #[async_test]
1360    async fn test_load_from_storage_resilient_to_failure() {
1361        let room_id = room_id!("!fondue:patate.ch");
1362        let event_cache_store = Arc::new(MemoryStore::new());
1363
1364        let event = EventFactory::new()
1365            .room(room_id)
1366            .sender(user_id!("@ben:saucisse.bzh"))
1367            .text_msg("foo")
1368            .event_id(event_id!("$42"))
1369            .into_event();
1370
1371        // Prefill the store with invalid data: two chunks that form a cycle.
1372        event_cache_store
1373            .handle_linked_chunk_updates(
1374                LinkedChunkId::Room(room_id),
1375                vec![
1376                    Update::NewItemsChunk {
1377                        previous: None,
1378                        new: ChunkIdentifier::new(0),
1379                        next: None,
1380                    },
1381                    Update::PushItems {
1382                        at: Position::new(ChunkIdentifier::new(0), 0),
1383                        items: vec![event],
1384                    },
1385                    Update::NewItemsChunk {
1386                        previous: Some(ChunkIdentifier::new(0)),
1387                        new: ChunkIdentifier::new(1),
1388                        next: Some(ChunkIdentifier::new(0)),
1389                    },
1390                ],
1391            )
1392            .await
1393            .unwrap();
1394
1395        let client = MockClientBuilder::new(None)
1396            .on_builder(|builder| {
1397                builder.store_config(
1398                    StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1399                        .event_cache_store(event_cache_store.clone()),
1400                )
1401            })
1402            .build()
1403            .await;
1404
1405        let event_cache = client.event_cache();
1406
1407        // Don't forget to subscribe and like.
1408        event_cache.subscribe().unwrap();
1409
1410        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1411        let room = client.get_room(room_id).unwrap();
1412
1413        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1414
1415        let items = room_event_cache.events().await.unwrap();
1416
1417        // Because the persisted content was invalid, the room store is reset: there are
1418        // no events in the cache.
1419        assert!(items.is_empty());
1420
1421        // Storage doesn't contain anything. It would also be valid that it contains a
1422        // single initial empty items chunk.
1423        let raw_chunks =
1424            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1425        assert!(raw_chunks.is_empty());
1426    }
1427
1428    #[async_test]
1429    async fn test_no_useless_gaps() {
1430        let room_id = room_id!("!galette:saucisse.bzh");
1431
1432        let client = MockClientBuilder::new(None).build().await;
1433
1434        let event_cache = client.event_cache();
1435        event_cache.subscribe().unwrap();
1436
1437        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1438        let room = client.get_room(room_id).unwrap();
1439        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1440        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1441
1442        let f = EventFactory::new().room(room_id).sender(*ALICE);
1443
1444        // Propagate an update including a limited timeline with one message and a
1445        // prev-batch token.
1446        room_event_cache
1447            .handle_joined_room_update(JoinedRoomUpdate {
1448                timeline: Timeline {
1449                    limited: true,
1450                    prev_batch: Some("raclette".to_owned()),
1451                    events: vec![f.text_msg("hey yo").into_event()],
1452                },
1453                ..Default::default()
1454            })
1455            .await
1456            .unwrap();
1457
1458        // Just checking the generic update is correct.
1459        assert_matches!(
1460            generic_stream.recv().await,
1461            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1462                assert_eq!(expected_room_id, room_id);
1463            }
1464        );
1465        assert!(generic_stream.is_empty());
1466
1467        {
1468            let state = room_event_cache.inner.state.read().await.unwrap();
1469
1470            let mut num_gaps = 0;
1471            let mut num_events = 0;
1472
1473            for c in state.room_linked_chunk().chunks() {
1474                match c.content() {
1475                    ChunkContent::Items(items) => num_events += items.len(),
1476                    ChunkContent::Gap(_) => num_gaps += 1,
1477                }
1478            }
1479
1480            // The limited sync unloads the chunk, so it will appear as if there are only
1481            // the events.
1482            assert_eq!(num_gaps, 0);
1483            assert_eq!(num_events, 1);
1484        }
1485
1486        // But if I manually reload more of the chunk, the gap will be present.
1487        assert_matches!(
1488            room_event_cache.pagination().load_more_events_backwards().await.unwrap(),
1489            LoadMoreEventsBackwardsOutcome::Gap { .. }
1490        );
1491
1492        {
1493            let state = room_event_cache.inner.state.read().await.unwrap();
1494
1495            let mut num_gaps = 0;
1496            let mut num_events = 0;
1497
1498            for c in state.room_linked_chunk().chunks() {
1499                match c.content() {
1500                    ChunkContent::Items(items) => num_events += items.len(),
1501                    ChunkContent::Gap(_) => num_gaps += 1,
1502                }
1503            }
1504
1505            // The gap must have been stored.
1506            assert_eq!(num_gaps, 1);
1507            assert_eq!(num_events, 1);
1508        }
1509
1510        // Now, propagate an update for another message, but the timeline isn't limited
1511        // this time.
1512        room_event_cache
1513            .handle_joined_room_update(JoinedRoomUpdate {
1514                timeline: Timeline {
1515                    limited: false,
1516                    prev_batch: Some("fondue".to_owned()),
1517                    events: vec![f.text_msg("sup").into_event()],
1518                },
1519                ..Default::default()
1520            })
1521            .await
1522            .unwrap();
1523
1524        // Just checking the generic update is correct.
1525        assert_matches!(
1526            generic_stream.recv().await,
1527            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1528                assert_eq!(expected_room_id, room_id);
1529            }
1530        );
1531        assert!(generic_stream.is_empty());
1532
1533        {
1534            let state = room_event_cache.inner.state.read().await.unwrap();
1535
1536            let mut num_gaps = 0;
1537            let mut num_events = 0;
1538
1539            for c in state.room_linked_chunk().chunks() {
1540                match c.content() {
1541                    ChunkContent::Items(items) => num_events += items.len(),
1542                    ChunkContent::Gap(gap) => {
1543                        assert_eq!(gap.token, "raclette");
1544                        num_gaps += 1;
1545                    }
1546                }
1547            }
1548
1549            // There's only the previous gap, no new ones.
1550            assert_eq!(num_gaps, 1);
1551            assert_eq!(num_events, 2);
1552        }
1553    }
1554
1555    #[async_test]
1556    async fn test_shrink_to_last_chunk() {
1557        let room_id = room_id!("!galette:saucisse.bzh");
1558
1559        let client = MockClientBuilder::new(None).build().await;
1560
1561        let f = EventFactory::new().room(room_id);
1562
1563        let evid1 = event_id!("$1");
1564        let evid2 = event_id!("$2");
1565
1566        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1567        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1568
1569        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1570        {
1571            client
1572                .event_cache_store()
1573                .lock()
1574                .await
1575                .expect("Could not acquire the event cache lock")
1576                .as_clean()
1577                .expect("Could not acquire a clean event cache lock")
1578                .handle_linked_chunk_updates(
1579                    LinkedChunkId::Room(room_id),
1580                    vec![
1581                        Update::NewItemsChunk {
1582                            previous: None,
1583                            new: ChunkIdentifier::new(0),
1584                            next: None,
1585                        },
1586                        Update::PushItems {
1587                            at: Position::new(ChunkIdentifier::new(0), 0),
1588                            items: vec![ev1],
1589                        },
1590                        Update::NewItemsChunk {
1591                            previous: Some(ChunkIdentifier::new(0)),
1592                            new: ChunkIdentifier::new(1),
1593                            next: None,
1594                        },
1595                        Update::PushItems {
1596                            at: Position::new(ChunkIdentifier::new(1), 0),
1597                            items: vec![ev2],
1598                        },
1599                    ],
1600                )
1601                .await
1602                .unwrap();
1603        }
1604
1605        let event_cache = client.event_cache();
1606        event_cache.subscribe().unwrap();
1607
1608        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1609        let room = client.get_room(room_id).unwrap();
1610        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1611
1612        // Sanity check: lazily loaded, so only includes one item at start.
1613        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1614        assert_eq!(events.len(), 1);
1615        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1616        assert!(stream.is_empty());
1617
1618        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1619
1620        // Force loading the full linked chunk by back-paginating.
1621        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1622        assert_eq!(outcome.events.len(), 1);
1623        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1624        assert!(outcome.reached_start);
1625
1626        // We also get an update about the loading from the store.
1627        assert_let_timeout!(
1628            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1629                stream.recv()
1630        );
1631        assert_eq!(diffs.len(), 1);
1632        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1633            assert_eq!(value.event_id().as_deref(), Some(evid1));
1634        });
1635
1636        assert!(stream.is_empty());
1637
1638        // Same for the generic update.
1639        assert_let_timeout!(
1640            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1641        );
1642        assert_eq!(expected_room_id, room_id);
1643        assert!(generic_stream.is_empty());
1644
1645        // Shrink the linked chunk to the last chunk.
1646        room_event_cache
1647            .inner
1648            .state
1649            .write()
1650            .await
1651            .unwrap()
1652            .reload()
1653            .await
1654            .expect("shrinking should succeed");
1655
1656        // We receive updates about the changes to the linked chunk.
1657        assert_let_timeout!(
1658            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1659                stream.recv()
1660        );
1661        assert_eq!(diffs.len(), 2);
1662        assert_matches!(&diffs[0], VectorDiff::Clear);
1663        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1664            assert_eq!(values.len(), 1);
1665            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1666        });
1667
1668        assert!(stream.is_empty());
1669
1670        // A generic update has been received.
1671        assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1672        assert!(generic_stream.is_empty());
1673
1674        // When reading the events, we do get only the last one.
1675        let events = room_event_cache.events().await.unwrap();
1676        assert_eq!(events.len(), 1);
1677        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1678
1679        // But if we back-paginate, we don't need access to network to find out about
1680        // the previous event.
1681        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1682        assert_eq!(outcome.events.len(), 1);
1683        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1684        assert!(outcome.reached_start);
1685    }
1686
1687    #[async_test]
1688    async fn test_room_ordering() {
1689        let room_id = room_id!("!galette:saucisse.bzh");
1690
1691        let client = MockClientBuilder::new(None).build().await;
1692
1693        let f = EventFactory::new().room(room_id).sender(*ALICE);
1694
1695        let evid1 = event_id!("$1");
1696        let evid2 = event_id!("$2");
1697        let evid3 = event_id!("$3");
1698
1699        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1700        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1701        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1702
1703        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1704        {
1705            client
1706                .event_cache_store()
1707                .lock()
1708                .await
1709                .expect("Could not acquire the event cache lock")
1710                .as_clean()
1711                .expect("Could not acquire a clean event cache lock")
1712                .handle_linked_chunk_updates(
1713                    LinkedChunkId::Room(room_id),
1714                    vec![
1715                        Update::NewItemsChunk {
1716                            previous: None,
1717                            new: ChunkIdentifier::new(0),
1718                            next: None,
1719                        },
1720                        Update::PushItems {
1721                            at: Position::new(ChunkIdentifier::new(0), 0),
1722                            items: vec![ev1, ev2],
1723                        },
1724                        Update::NewItemsChunk {
1725                            previous: Some(ChunkIdentifier::new(0)),
1726                            new: ChunkIdentifier::new(1),
1727                            next: None,
1728                        },
1729                        Update::PushItems {
1730                            at: Position::new(ChunkIdentifier::new(1), 0),
1731                            items: vec![ev3.clone()],
1732                        },
1733                    ],
1734                )
1735                .await
1736                .unwrap();
1737        }
1738
1739        let event_cache = client.event_cache();
1740        event_cache.subscribe().unwrap();
1741
1742        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1743        let room = client.get_room(room_id).unwrap();
1744        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1745
1746        // Initially, the linked chunk only contains the last chunk, so only ev3 is
1747        // loaded.
1748        {
1749            let state = room_event_cache.inner.state.read().await.unwrap();
1750            let room_linked_chunk = state.room_linked_chunk();
1751
1752            // But we can get the order of ev1.
1753            assert_eq!(
1754                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1755                Some(0)
1756            );
1757
1758            // And that of ev2 as well.
1759            assert_eq!(
1760                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1761                Some(1)
1762            );
1763
1764            // ev3, which is loaded, also has a known ordering.
1765            let mut events = room_linked_chunk.events();
1766            let (pos, ev) = events.next().unwrap();
1767            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1768            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1769            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1770
1771            // No other loaded events.
1772            assert!(events.next().is_none());
1773        }
1774
1775        // Force loading the full linked chunk by back-paginating.
1776        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1777        assert!(outcome.reached_start);
1778
1779        // All events are now loaded, so their order is precisely their enumerated index
1780        // in a linear iteration.
1781        {
1782            let state = room_event_cache.inner.state.read().await.unwrap();
1783            let room_linked_chunk = state.room_linked_chunk();
1784
1785            for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1786                assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1787            }
1788        }
1789
1790        // Handle a gappy sync with two events (including one duplicate, so
1791        // deduplication kicks in), so that the linked chunk is shrunk to the
1792        // last chunk, and that the linked chunk only contains the last two
1793        // events.
1794        let evid4 = event_id!("$4");
1795        room_event_cache
1796            .handle_joined_room_update(JoinedRoomUpdate {
1797                timeline: Timeline {
1798                    limited: true,
1799                    prev_batch: Some("fondue".to_owned()),
1800                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1801                },
1802                ..Default::default()
1803            })
1804            .await
1805            .unwrap();
1806
1807        {
1808            let state = room_event_cache.inner.state.read().await.unwrap();
1809            let room_linked_chunk = state.room_linked_chunk();
1810
1811            // After the shrink, only evid3 and evid4 are loaded.
1812            let mut events = room_linked_chunk.events();
1813
1814            let (pos, ev) = events.next().unwrap();
1815            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1816            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1817
1818            let (pos, ev) = events.next().unwrap();
1819            assert_eq!(ev.event_id().as_deref(), Some(evid4));
1820            assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1821
1822            // No other loaded events.
1823            assert!(events.next().is_none());
1824
1825            // But we can still get the order of previous events.
1826            assert_eq!(
1827                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1828                Some(0)
1829            );
1830            assert_eq!(
1831                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1832                Some(1)
1833            );
1834
1835            // ev3 doesn't have an order with its previous position, since it's been
1836            // deduplicated.
1837            assert_eq!(
1838                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1839                None
1840            );
1841        }
1842    }
1843
1844    #[async_test]
1845    async fn test_auto_shrink_after_all_subscribers_are_gone() {
1846        let room_id = room_id!("!galette:saucisse.bzh");
1847
1848        let client = MockClientBuilder::new(None).build().await;
1849
1850        let f = EventFactory::new().room(room_id);
1851
1852        let evid1 = event_id!("$1");
1853        let evid2 = event_id!("$2");
1854
1855        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1856        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1857
1858        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1859        {
1860            client
1861                .event_cache_store()
1862                .lock()
1863                .await
1864                .expect("Could not acquire the event cache lock")
1865                .as_clean()
1866                .expect("Could not acquire a clean event cache lock")
1867                .handle_linked_chunk_updates(
1868                    LinkedChunkId::Room(room_id),
1869                    vec![
1870                        Update::NewItemsChunk {
1871                            previous: None,
1872                            new: ChunkIdentifier::new(0),
1873                            next: None,
1874                        },
1875                        Update::PushItems {
1876                            at: Position::new(ChunkIdentifier::new(0), 0),
1877                            items: vec![ev1],
1878                        },
1879                        Update::NewItemsChunk {
1880                            previous: Some(ChunkIdentifier::new(0)),
1881                            new: ChunkIdentifier::new(1),
1882                            next: None,
1883                        },
1884                        Update::PushItems {
1885                            at: Position::new(ChunkIdentifier::new(1), 0),
1886                            items: vec![ev2],
1887                        },
1888                    ],
1889                )
1890                .await
1891                .unwrap();
1892        }
1893
1894        let event_cache = client.event_cache();
1895        event_cache.subscribe().unwrap();
1896
1897        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1898        let room = client.get_room(room_id).unwrap();
1899        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1900
1901        // Sanity check: lazily loaded, so only includes one item at start.
1902        let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1903        assert_eq!(events1.len(), 1);
1904        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1905        assert!(stream1.is_empty());
1906
1907        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1908
1909        // Force loading the full linked chunk by back-paginating.
1910        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1911        assert_eq!(outcome.events.len(), 1);
1912        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1913        assert!(outcome.reached_start);
1914
1915        // We also get an update about the loading from the store. Ignore it, for this
1916        // test's sake.
1917        assert_let_timeout!(
1918            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1919                stream1.recv()
1920        );
1921        assert_eq!(diffs.len(), 1);
1922        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1923            assert_eq!(value.event_id().as_deref(), Some(evid1));
1924        });
1925
1926        assert!(stream1.is_empty());
1927
1928        assert_let_timeout!(
1929            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1930        );
1931        assert_eq!(expected_room_id, room_id);
1932        assert!(generic_stream.is_empty());
1933
1934        // Have another subscriber.
1935        // Since it's not the first one, and the previous one loaded some more events,
1936        // the second subscribers sees them all.
1937        let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1938        assert_eq!(events2.len(), 2);
1939        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1940        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1941        assert!(stream2.is_empty());
1942
1943        // Drop the first stream, and wait a bit.
1944        drop(stream1);
1945        yield_now().await;
1946
1947        // The second stream remains undisturbed.
1948        assert!(stream2.is_empty());
1949
1950        // Now drop the second stream, and wait a bit.
1951        drop(stream2);
1952        yield_now().await;
1953
1954        // The linked chunk must have auto-shrunk by now.
1955
1956        {
1957            // Check the inner state: there's no more shared auto-shrinker.
1958            let state = room_event_cache.inner.state.read().await.unwrap();
1959            assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
1960        }
1961
1962        // Getting the events will only give us the latest chunk.
1963        let events3 = room_event_cache.events().await.unwrap();
1964        assert_eq!(events3.len(), 1);
1965        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
1966    }
1967
1968    #[async_test]
1969    async fn test_rfind_map_event_in_memory_by() {
1970        let user_id = user_id!("@mnt_io:matrix.org");
1971        let room_id = room_id!("!raclette:patate.ch");
1972        let client = MockClientBuilder::new(None).build().await;
1973
1974        let event_factory = EventFactory::new().room(room_id);
1975
1976        let event_id_0 = event_id!("$ev0");
1977        let event_id_1 = event_id!("$ev1");
1978        let event_id_2 = event_id!("$ev2");
1979        let event_id_3 = event_id!("$ev3");
1980
1981        let event_0 =
1982            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
1983        let event_1 =
1984            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
1985        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
1986        let event_3 =
1987            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
1988
1989        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
1990        // events.
1991        {
1992            client
1993                .event_cache_store()
1994                .lock()
1995                .await
1996                .expect("Could not acquire the event cache lock")
1997                .as_clean()
1998                .expect("Could not acquire a clean event cache lock")
1999                .handle_linked_chunk_updates(
2000                    LinkedChunkId::Room(room_id),
2001                    vec![
2002                        Update::NewItemsChunk {
2003                            previous: None,
2004                            new: ChunkIdentifier::new(0),
2005                            next: None,
2006                        },
2007                        Update::PushItems {
2008                            at: Position::new(ChunkIdentifier::new(0), 0),
2009                            items: vec![event_3],
2010                        },
2011                        Update::NewItemsChunk {
2012                            previous: Some(ChunkIdentifier::new(0)),
2013                            new: ChunkIdentifier::new(1),
2014                            next: None,
2015                        },
2016                        Update::PushItems {
2017                            at: Position::new(ChunkIdentifier::new(1), 0),
2018                            items: vec![event_0, event_1, event_2],
2019                        },
2020                    ],
2021                )
2022                .await
2023                .unwrap();
2024        }
2025
2026        let event_cache = client.event_cache();
2027        event_cache.subscribe().unwrap();
2028
2029        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2030        let room = client.get_room(room_id).unwrap();
2031        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2032
2033        // Look for an event from `BOB`: it must be `event_0`.
2034        assert_matches!(
2035            room_event_cache
2036                .rfind_map_event_in_memory_by(|event| {
2037                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
2038                })
2039                .await,
2040            Ok(Some(event_id)) => {
2041                assert_eq!(event_id.as_deref(), Some(event_id_0));
2042            }
2043        );
2044
2045        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
2046        // because events are looked for in reverse order.
2047        assert_matches!(
2048            room_event_cache
2049                .rfind_map_event_in_memory_by(|event| {
2050                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
2051                })
2052                .await,
2053            Ok(Some(event_id)) => {
2054                assert_eq!(event_id.as_deref(), Some(event_id_2));
2055            }
2056        );
2057
2058        // Look for an event that is inside the storage, but not loaded.
2059        assert!(
2060            room_event_cache
2061                .rfind_map_event_in_memory_by(|event| {
2062                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
2063                        == Some(user_id))
2064                    .then(|| event.event_id())
2065                })
2066                .await
2067                .unwrap()
2068                .is_none()
2069        );
2070
2071        // Look for an event that doesn't exist.
2072        assert!(
2073            room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2074        );
2075    }
2076
2077    #[async_test]
2078    async fn test_reload_when_dirty() {
2079        let user_id = user_id!("@mnt_io:matrix.org");
2080        let room_id = room_id!("!raclette:patate.ch");
2081
2082        // The storage shared by the two clients.
2083        let event_cache_store = MemoryStore::new();
2084
2085        // Client for the process 0.
2086        let client_p0 = MockClientBuilder::new(None)
2087            .on_builder(|builder| {
2088                builder.store_config(
2089                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2090                        .event_cache_store(event_cache_store.clone()),
2091                )
2092            })
2093            .build()
2094            .await;
2095
2096        // Client for the process 1.
2097        let client_p1 = MockClientBuilder::new(None)
2098            .on_builder(|builder| {
2099                builder.store_config(
2100                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2101                        .event_cache_store(event_cache_store),
2102                )
2103            })
2104            .build()
2105            .await;
2106
2107        let event_factory = EventFactory::new().room(room_id).sender(user_id);
2108
2109        let ev_id_0 = event_id!("$ev_0");
2110        let ev_id_1 = event_id!("$ev_1");
2111
2112        let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2113        let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2114
2115        // Add events to the storage (shared by the two clients!).
2116        client_p0
2117            .event_cache_store()
2118            .lock()
2119            .await
2120            .expect("[p0] Could not acquire the event cache lock")
2121            .as_clean()
2122            .expect("[p0] Could not acquire a clean event cache lock")
2123            .handle_linked_chunk_updates(
2124                LinkedChunkId::Room(room_id),
2125                vec![
2126                    Update::NewItemsChunk {
2127                        previous: None,
2128                        new: ChunkIdentifier::new(0),
2129                        next: None,
2130                    },
2131                    Update::PushItems {
2132                        at: Position::new(ChunkIdentifier::new(0), 0),
2133                        items: vec![ev_0],
2134                    },
2135                    Update::NewItemsChunk {
2136                        previous: Some(ChunkIdentifier::new(0)),
2137                        new: ChunkIdentifier::new(1),
2138                        next: None,
2139                    },
2140                    Update::PushItems {
2141                        at: Position::new(ChunkIdentifier::new(1), 0),
2142                        items: vec![ev_1],
2143                    },
2144                ],
2145            )
2146            .await
2147            .unwrap();
2148
2149        // Subscribe the event caches, and create the room.
2150        let (room_event_cache_p0, room_event_cache_p1) = {
2151            let event_cache_p0 = client_p0.event_cache();
2152            event_cache_p0.subscribe().unwrap();
2153
2154            let event_cache_p1 = client_p1.event_cache();
2155            event_cache_p1.subscribe().unwrap();
2156
2157            client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
2158            client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
2159
2160            let (room_event_cache_p0, _drop_handles) =
2161                client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2162            let (room_event_cache_p1, _drop_handles) =
2163                client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2164
2165            (room_event_cache_p0, room_event_cache_p1)
2166        };
2167
2168        // Okay. We are ready for the test!
2169        //
2170        // First off, let's check `room_event_cache_p0` has access to the first event
2171        // loaded in-memory, then do a pagination, and see more events.
2172        let mut updates_stream_p0 = {
2173            let room_event_cache = &room_event_cache_p0;
2174
2175            let (initial_updates, mut updates_stream) =
2176                room_event_cache_p0.subscribe().await.unwrap();
2177
2178            // Initial updates contain `ev_id_1` only.
2179            assert_eq!(initial_updates.len(), 1);
2180            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2181            assert!(updates_stream.is_empty());
2182
2183            // `ev_id_1` must be loaded in memory.
2184            assert!(event_loaded(room_event_cache, ev_id_1).await);
2185
2186            // `ev_id_0` must NOT be loaded in memory.
2187            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2188
2189            // Load one more event with a backpagination.
2190            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2191
2192            // A new update for `ev_id_0` must be present.
2193            assert_matches!(
2194                updates_stream.recv().await.unwrap(),
2195                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2196                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2197                    assert_matches!(
2198                        &diffs[0],
2199                        VectorDiff::Insert { index: 0, value: event } => {
2200                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2201                        }
2202                    );
2203                }
2204            );
2205
2206            // `ev_id_0` must now be loaded in memory.
2207            assert!(event_loaded(room_event_cache, ev_id_0).await);
2208
2209            updates_stream
2210        };
2211
2212        // Second, let's check `room_event_cache_p1` has the same accesses.
2213        let mut updates_stream_p1 = {
2214            let room_event_cache = &room_event_cache_p1;
2215            let (initial_updates, mut updates_stream) =
2216                room_event_cache_p1.subscribe().await.unwrap();
2217
2218            // Initial updates contain `ev_id_1` only.
2219            assert_eq!(initial_updates.len(), 1);
2220            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2221            assert!(updates_stream.is_empty());
2222
2223            // `ev_id_1` must be loaded in memory.
2224            assert!(event_loaded(room_event_cache, ev_id_1).await);
2225
2226            // `ev_id_0` must NOT be loaded in memory.
2227            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2228
2229            // Load one more event with a backpagination.
2230            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2231
2232            // A new update for `ev_id_0` must be present.
2233            assert_matches!(
2234                updates_stream.recv().await.unwrap(),
2235                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2236                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2237                    assert_matches!(
2238                        &diffs[0],
2239                        VectorDiff::Insert { index: 0, value: event } => {
2240                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2241                        }
2242                    );
2243                }
2244            );
2245
2246            // `ev_id_0` must now be loaded in memory.
2247            assert!(event_loaded(room_event_cache, ev_id_0).await);
2248
2249            updates_stream
2250        };
2251
2252        // Do this a couple times, for the fun.
2253        for _ in 0..3 {
2254            // Third, because `room_event_cache_p1` has locked the store, the lock
2255            // is dirty for `room_event_cache_p0`, so it will shrink to its last
2256            // chunk!
2257            {
2258                let room_event_cache = &room_event_cache_p0;
2259                let updates_stream = &mut updates_stream_p0;
2260
2261                // `ev_id_1` must be loaded in memory, just like before.
2262                assert!(event_loaded(room_event_cache, ev_id_1).await);
2263
2264                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2265                // state has been reloaded to its last chunk.
2266                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2267
2268                // The reload can be observed via the updates too.
2269                assert_matches!(
2270                    updates_stream.recv().await.unwrap(),
2271                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2272                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2273                        assert_matches!(&diffs[0], VectorDiff::Clear);
2274                        assert_matches!(
2275                            &diffs[1],
2276                            VectorDiff::Append { values: events } => {
2277                                assert_eq!(events.len(), 1);
2278                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2279                            }
2280                        );
2281                    }
2282                );
2283
2284                // Load one more event with a backpagination.
2285                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2286
2287                // `ev_id_0` must now be loaded in memory.
2288                assert!(event_loaded(room_event_cache, ev_id_0).await);
2289
2290                // The pagination can be observed via the updates too.
2291                assert_matches!(
2292                    updates_stream.recv().await.unwrap(),
2293                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2294                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2295                        assert_matches!(
2296                            &diffs[0],
2297                            VectorDiff::Insert { index: 0, value: event } => {
2298                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2299                            }
2300                        );
2301                    }
2302                );
2303            }
2304
2305            // Fourth, because `room_event_cache_p0` has locked the store again, the lock
2306            // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
2307            // chunk!
2308            {
2309                let room_event_cache = &room_event_cache_p1;
2310                let updates_stream = &mut updates_stream_p1;
2311
2312                // `ev_id_1` must be loaded in memory, just like before.
2313                assert!(event_loaded(room_event_cache, ev_id_1).await);
2314
2315                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2316                // state has shrunk to its last chunk.
2317                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2318
2319                // The reload can be observed via the updates too.
2320                assert_matches!(
2321                    updates_stream.recv().await.unwrap(),
2322                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2323                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2324                        assert_matches!(&diffs[0], VectorDiff::Clear);
2325                        assert_matches!(
2326                            &diffs[1],
2327                            VectorDiff::Append { values: events } => {
2328                                assert_eq!(events.len(), 1);
2329                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2330                            }
2331                        );
2332                    }
2333                );
2334
2335                // Load one more event with a backpagination.
2336                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2337
2338                // `ev_id_0` must now be loaded in memory.
2339                assert!(event_loaded(room_event_cache, ev_id_0).await);
2340
2341                // The pagination can be observed via the updates too.
2342                assert_matches!(
2343                    updates_stream.recv().await.unwrap(),
2344                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2345                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2346                        assert_matches!(
2347                            &diffs[0],
2348                            VectorDiff::Insert { index: 0, value: event } => {
2349                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2350                            }
2351                        );
2352                    }
2353                );
2354            }
2355        }
2356
2357        // Repeat that with an explicit read lock (so that we don't rely on
2358        // `event_loaded` to trigger the dirty detection).
2359        for _ in 0..3 {
2360            {
2361                let room_event_cache = &room_event_cache_p0;
2362                let updates_stream = &mut updates_stream_p0;
2363
2364                let guard = room_event_cache.inner.state.read().await.unwrap();
2365
2366                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2367                // shared access.
2368                // See `RoomEventCacheStateLock::read` to learn more.
2369
2370                // The lock is no longer marked as dirty, it's been cleaned.
2371                assert!(guard.is_dirty().not());
2372
2373                // The reload can be observed via the updates too.
2374                assert_matches!(
2375                    updates_stream.recv().await.unwrap(),
2376                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2377                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2378                        assert_matches!(&diffs[0], VectorDiff::Clear);
2379                        assert_matches!(
2380                            &diffs[1],
2381                            VectorDiff::Append { values: events } => {
2382                                assert_eq!(events.len(), 1);
2383                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2384                            }
2385                        );
2386                    }
2387                );
2388
2389                assert!(event_loaded(room_event_cache, ev_id_1).await);
2390                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2391
2392                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2393                // want to make this super explicit).
2394                //
2395                // We drop need to drop it before the pagination because the pagination needs to
2396                // obtain a write lock.
2397                drop(guard);
2398
2399                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2400                assert!(event_loaded(room_event_cache, ev_id_0).await);
2401
2402                // The pagination can be observed via the updates too.
2403                assert_matches!(
2404                    updates_stream.recv().await.unwrap(),
2405                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2406                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2407                        assert_matches!(
2408                            &diffs[0],
2409                            VectorDiff::Insert { index: 0, value: event } => {
2410                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2411                            }
2412                        );
2413                    }
2414                );
2415            }
2416
2417            {
2418                let room_event_cache = &room_event_cache_p1;
2419                let updates_stream = &mut updates_stream_p1;
2420
2421                let guard = room_event_cache.inner.state.read().await.unwrap();
2422
2423                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2424                // shared access.
2425
2426                // The lock is no longer marked as dirty, it's been cleaned.
2427                assert!(guard.is_dirty().not());
2428
2429                // The reload can be observed via the updates too.
2430                assert_matches!(
2431                    updates_stream.recv().await.unwrap(),
2432                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2433                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2434                        assert_matches!(&diffs[0], VectorDiff::Clear);
2435                        assert_matches!(
2436                            &diffs[1],
2437                            VectorDiff::Append { values: events } => {
2438                                assert_eq!(events.len(), 1);
2439                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2440                            }
2441                        );
2442                    }
2443                );
2444
2445                assert!(event_loaded(room_event_cache, ev_id_1).await);
2446                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2447
2448                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2449                // want to make this super explicit).
2450                //
2451                // We drop need to drop it before the pagination because the pagination needs to
2452                // obtain a write lock.
2453                drop(guard);
2454
2455                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2456                assert!(event_loaded(room_event_cache, ev_id_0).await);
2457
2458                // The pagination can be observed via the updates too.
2459                assert_matches!(
2460                    updates_stream.recv().await.unwrap(),
2461                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2462                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2463                        assert_matches!(
2464                            &diffs[0],
2465                            VectorDiff::Insert { index: 0, value: event } => {
2466                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2467                            }
2468                        );
2469                    }
2470                );
2471            }
2472        }
2473
2474        // Repeat that with an explicit write lock.
2475        for _ in 0..3 {
2476            {
2477                let room_event_cache = &room_event_cache_p0;
2478                let updates_stream = &mut updates_stream_p0;
2479
2480                let guard = room_event_cache.inner.state.write().await.unwrap();
2481
2482                // The lock is no longer marked as dirty, it's been cleaned.
2483                assert!(guard.is_dirty().not());
2484
2485                // The reload can be observed via the updates too.
2486                assert_matches!(
2487                    updates_stream.recv().await.unwrap(),
2488                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2489                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2490                        assert_matches!(&diffs[0], VectorDiff::Clear);
2491                        assert_matches!(
2492                            &diffs[1],
2493                            VectorDiff::Append { values: events } => {
2494                                assert_eq!(events.len(), 1);
2495                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2496                            }
2497                        );
2498                    }
2499                );
2500
2501                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2502                // needs to obtain a read lock.
2503                drop(guard);
2504
2505                assert!(event_loaded(room_event_cache, ev_id_1).await);
2506                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2507
2508                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2509                assert!(event_loaded(room_event_cache, ev_id_0).await);
2510
2511                // The pagination can be observed via the updates too.
2512                assert_matches!(
2513                    updates_stream.recv().await.unwrap(),
2514                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2515                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2516                        assert_matches!(
2517                            &diffs[0],
2518                            VectorDiff::Insert { index: 0, value: event } => {
2519                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2520                            }
2521                        );
2522                    }
2523                );
2524            }
2525
2526            {
2527                let room_event_cache = &room_event_cache_p1;
2528                let updates_stream = &mut updates_stream_p1;
2529
2530                let guard = room_event_cache.inner.state.write().await.unwrap();
2531
2532                // The lock is no longer marked as dirty, it's been cleaned.
2533                assert!(guard.is_dirty().not());
2534
2535                // The reload can be observed via the updates too.
2536                assert_matches!(
2537                    updates_stream.recv().await.unwrap(),
2538                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2539                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2540                        assert_matches!(&diffs[0], VectorDiff::Clear);
2541                        assert_matches!(
2542                            &diffs[1],
2543                            VectorDiff::Append { values: events } => {
2544                                assert_eq!(events.len(), 1);
2545                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2546                            }
2547                        );
2548                    }
2549                );
2550
2551                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2552                // needs to obtain a read lock.
2553                drop(guard);
2554
2555                assert!(event_loaded(room_event_cache, ev_id_1).await);
2556                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2557
2558                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2559                assert!(event_loaded(room_event_cache, ev_id_0).await);
2560
2561                // The pagination can be observed via the updates too.
2562                assert_matches!(
2563                    updates_stream.recv().await.unwrap(),
2564                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2565                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2566                        assert_matches!(
2567                            &diffs[0],
2568                            VectorDiff::Insert { index: 0, value: event } => {
2569                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2570                            }
2571                        );
2572                    }
2573                );
2574            }
2575        }
2576    }
2577
2578    #[async_test]
2579    async fn test_load_when_dirty() {
2580        let room_id_0 = room_id!("!raclette:patate.ch");
2581        let room_id_1 = room_id!("!morbiflette:patate.ch");
2582
2583        // The storage shared by the two clients.
2584        let event_cache_store = MemoryStore::new();
2585
2586        // Client for the process 0.
2587        let client_p0 = MockClientBuilder::new(None)
2588            .on_builder(|builder| {
2589                builder.store_config(
2590                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2591                        .event_cache_store(event_cache_store.clone()),
2592                )
2593            })
2594            .build()
2595            .await;
2596
2597        // Client for the process 1.
2598        let client_p1 = MockClientBuilder::new(None)
2599            .on_builder(|builder| {
2600                builder.store_config(
2601                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2602                        .event_cache_store(event_cache_store),
2603                )
2604            })
2605            .build()
2606            .await;
2607
2608        // Subscribe the event caches, and create the room.
2609        let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2610            let event_cache_p0 = client_p0.event_cache();
2611            event_cache_p0.subscribe().unwrap();
2612
2613            let event_cache_p1 = client_p1.event_cache();
2614            event_cache_p1.subscribe().unwrap();
2615
2616            client_p0.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2617            client_p0.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2618
2619            client_p1.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2620            client_p1.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2621
2622            let (room_event_cache_0_p0, _drop_handles) =
2623                client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2624            let (room_event_cache_0_p1, _drop_handles) =
2625                client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2626
2627            (room_event_cache_0_p0, room_event_cache_0_p1)
2628        };
2629
2630        // Let's make the cross-process lock over the store dirty.
2631        {
2632            drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2633            drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2634        }
2635
2636        // Create the `RoomEventCache` for `room_id_1`. During its creation, the
2637        // cross-process lock over the store MUST be dirty, which makes no difference as
2638        // a clean one: the state is just loaded, not reloaded.
2639        let (room_event_cache_1_p0, _) =
2640            client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2641
2642        // Check the lock isn't dirty because it's been cleared.
2643        {
2644            let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2645            assert!(guard.is_dirty().not());
2646        }
2647
2648        // The only way to test this behaviour is to see that the dirty block in
2649        // `RoomEventCacheStateLock` is covered by this test.
2650    }
2651
2652    #[async_test]
2653    async fn test_uniq_read_marker() {
2654        let client = MockClientBuilder::new(None).build().await;
2655        let room_id = room_id!("!galette:saucisse.bzh");
2656        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2657
2658        let event_cache = client.event_cache();
2659
2660        event_cache.subscribe().unwrap();
2661
2662        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2663        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
2664        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
2665
2666        assert!(events.is_empty());
2667
2668        // When sending multiple times the same read marker event,…
2669        let read_marker_event = Raw::from_json_string(
2670            json!({
2671                "content": {
2672                    "event_id": "$crepe:saucisse.bzh"
2673                },
2674                "room_id": "!galette:saucisse.bzh",
2675                "type": "m.fully_read"
2676            })
2677            .to_string(),
2678        )
2679        .unwrap();
2680        let account_data = vec![read_marker_event; 100];
2681
2682        room_event_cache
2683            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
2684            .await
2685            .unwrap();
2686
2687        // … there's only one read marker update.
2688        assert_matches!(
2689            stream.recv().await.unwrap(),
2690            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
2691        );
2692
2693        assert!(stream.recv().now_or_never().is_none());
2694
2695        // None, because an account data doesn't trigger a generic update.
2696        assert!(generic_stream.recv().now_or_never().is_none());
2697    }
2698
2699    async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2700        room_event_cache
2701            .rfind_map_event_in_memory_by(|event| {
2702                (event.event_id().as_deref() == Some(event_id)).then_some(())
2703            })
2704            .await
2705            .unwrap()
2706            .is_some()
2707    }
2708}