Skip to main content

matrix_sdk/event_cache/caches/room/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21    collections::BTreeMap,
22    fmt,
23    sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28    deserialized_responses::AmbiguityChange,
29    event_cache::Event,
30    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
31};
32use ruma::{
33    EventId, OwnedEventId, OwnedRoomId, RoomId,
34    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
35    serde::Raw,
36};
37pub(super) use state::{LockedRoomEventCacheState, RoomEventCacheStateLockWriteGuard};
38pub use subscriber::RoomEventCacheSubscriber;
39use tokio::sync::{Notify, broadcast::Receiver, mpsc};
40use tracing::{instrument, trace, warn};
41pub use updates::{
42    RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
43    RoomEventCacheUpdateSender,
44};
45
46use super::{
47    super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination},
48    TimelineVectorDiffs,
49    event_linked_chunk::sort_positions_descending,
50    thread::pagination::ThreadPagination,
51};
52use crate::{
53    client::WeakClient,
54    event_cache::{
55        EventFocusThreadMode,
56        caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus},
57    },
58    room::WeakRoom,
59};
60
61/// A subset of an event cache, for a room.
62///
63/// Cloning is shallow, and thus is cheap to do.
64#[derive(Clone)]
65pub struct RoomEventCache {
66    inner: Arc<RoomEventCacheInner>,
67}
68
69impl fmt::Debug for RoomEventCache {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("RoomEventCache").finish_non_exhaustive()
72    }
73}
74
75impl RoomEventCache {
76    /// Create a new [`RoomEventCache`] using the given room and store.
77    pub(super) fn new(
78        room_id: OwnedRoomId,
79        weak_room: WeakRoom,
80        state: LockedRoomEventCacheState,
81        shared_pagination_status: SharedObservable<SharedPaginationStatus>,
82        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
83        update_sender: RoomEventCacheUpdateSender,
84    ) -> Self {
85        Self {
86            inner: Arc::new(RoomEventCacheInner::new(
87                room_id,
88                weak_room,
89                state,
90                shared_pagination_status,
91                auto_shrink_sender,
92                update_sender,
93            )),
94        }
95    }
96
97    /// Get the room ID for this [`RoomEventCache`].
98    pub fn room_id(&self) -> &RoomId {
99        &self.inner.room_id
100    }
101
102    /// Read all current events.
103    ///
104    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
105    /// subscriber.
106    pub async fn events(&self) -> Result<Vec<Event>> {
107        let state = self.inner.state.read().await?;
108
109        Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
110    }
111
112    /// Subscribe to this room updates, after getting the initial list of
113    /// events.
114    ///
115    /// Use [`RoomEventCache::events`] to get all current events without the
116    /// subscriber. Creating, and especially dropping, a
117    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
118    pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
119        let state = self.inner.state.read().await?;
120        let events =
121            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
122
123        let subscriber_count = state.subscriber_count();
124        let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
125        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
126
127        let subscriber = RoomEventCacheSubscriber::new(
128            self.inner.update_sender.new_room_receiver(),
129            self.inner.room_id.clone(),
130            self.inner.auto_shrink_sender.clone(),
131            subscriber_count.clone(),
132        );
133
134        Ok((events, subscriber))
135    }
136
137    /// Subscribe to thread for a given root event, and get a (maybe empty)
138    /// initially known list of events for that thread.
139    pub async fn subscribe_to_thread(
140        &self,
141        thread_root: OwnedEventId,
142    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
143        let mut state = self.inner.state.write().await?;
144
145        state.subscribe_to_thread(thread_root).await
146    }
147
148    /// Subscribe to the pinned event cache for this room.
149    ///
150    /// This is a persisted view over the pinned events of a room.
151    ///
152    /// The pinned events will be initially reloaded from storage, and/or loaded
153    /// from a network request to fetch the latest pinned events and their
154    /// relations, to update it as needed. The list of pinned events will
155    /// also be kept up-to-date as new events are pinned, and new
156    /// related events show up from other sources.
157    pub async fn subscribe_to_pinned_events(
158        &self,
159    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
160        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
161        let state = self.inner.state.read().await?;
162
163        state.subscribe_to_pinned_events(room).await
164    }
165
166    /// Create or get an event-focused timeline cache for this room.
167    ///
168    /// This creates a timeline centered around a specific event (e.g., for
169    /// permalinks), in a given mode, supporting both forward and backward
170    /// pagination.
171    ///
172    /// If the focused event is part of a thread, the timeline will
173    /// automatically use thread-specific pagination.
174    ///
175    /// If the thread mode is defined to [`EventFocusThreadMode::ForceThread`],
176    /// the timeline will be focused on the thread root of the thread the
177    /// target event belongs to, or it will consider that the target event
178    /// itself is the thread root.
179    #[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))]
180    pub async fn get_or_create_event_focused_cache(
181        &self,
182        event_id: OwnedEventId,
183        num_context_events: u16,
184        thread_mode: EventFocusThreadMode,
185    ) -> Result<EventFocusedCache> {
186        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
187        let guard = self.inner.state.read().await?;
188
189        // Check if we already have a cache for this event.
190        if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
191            trace!("the cache was already created, returning it");
192            return Ok(cache);
193        }
194
195        // Create a new cache.
196        let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone();
197
198        // Make sure to drop the guard before calling `start_from` below, as it may need
199        // to lock the room event cache's state again, when memoizing events
200        // received from the network response.
201        drop(guard);
202
203        let room_id = room.room_id().to_owned();
204        let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone());
205
206        trace!("creating a fresh event-focused cache");
207        let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender);
208
209        // Initialize the cache from the server.
210        cache.start_from(room, num_context_events, thread_mode).await?;
211
212        let mut guard = self.inner.state.write().await?;
213
214        // Check again if we already have a cache for this event, just in case there was
215        // a race with another caller during initialization.
216        if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
217            trace!("another cache has been racily created, returning it");
218            return Ok(cache);
219        }
220
221        // Insert the cache in the map.
222        guard.insert_event_focused_cache(event_id, thread_mode, cache.clone());
223
224        Ok(cache)
225    }
226
227    /// Get an event-focused cache for this event and thread mode, if it exists.
228    ///
229    /// Otherwise, returns `None`.
230    ///
231    /// Use [`Self::get_or_create_event_focused_cache`] for ensuring such a
232    /// cache exists.
233    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234    pub async fn get_event_focused_cache(
235        &self,
236        event_id: OwnedEventId,
237        thread_mode: EventFocusThreadMode,
238    ) -> Result<Option<EventFocusedCache>> {
239        Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode))
240    }
241
242    /// Return a [`RoomPagination`] type useful for running back-pagination
243    /// queries in the current room.
244    pub fn pagination(&self) -> RoomPagination {
245        RoomPagination::new(self.inner.clone())
246    }
247
248    /// Return a [`ThreadPagination`] type useful for running back-pagination
249    /// queries in the `thread_id` thread.
250    pub async fn thread_pagination(&self, thread_id: OwnedEventId) -> Result<ThreadPagination> {
251        Ok(self.inner.state.write().await?.get_or_reload_thread(thread_id).pagination())
252    }
253
254    /// Try to find a single event in this room, starting from the most recent
255    /// event.
256    ///
257    /// The `predicate` receives the current event as its single argument.
258    ///
259    /// **Warning**! It looks into the loaded events from the in-memory linked
260    /// chunk **only**. It doesn't look inside the storage.
261    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
262    where
263        P: FnMut(&Event) -> Option<O>,
264    {
265        Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
266    }
267
268    /// Try to find an event by ID in this room.
269    ///
270    /// It starts by looking into loaded events before looking inside the
271    /// storage.
272    pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
273        Ok(self
274            .inner
275            .state
276            .read()
277            .await?
278            .find_event(event_id)
279            .await
280            .ok()
281            .flatten()
282            .map(|(_loc, event)| event))
283    }
284
285    /// Try to find an event by ID in this room, along with its related events.
286    ///
287    /// You can filter which types of related events to retrieve using
288    /// `filter`. `None` will retrieve related events of any type.
289    ///
290    /// The related events are sorted like this:
291    ///
292    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
293    ///   located at the beginning of the array.
294    /// - events present in the linked chunk (be it in memory or in the storage)
295    ///   will be sorted according to their ordering in the linked chunk.
296    pub async fn find_event_with_relations(
297        &self,
298        event_id: &EventId,
299        filter: Option<Vec<RelationType>>,
300    ) -> Result<Option<(Event, Vec<Event>)>> {
301        // Search in all loaded or stored events.
302        Ok(self
303            .inner
304            .state
305            .read()
306            .await?
307            .find_event_with_relations(event_id, filter.clone())
308            .await
309            .ok()
310            .flatten())
311    }
312
313    /// Try to find the related events for an event by ID in this room.
314    ///
315    /// You can filter which types of related events to retrieve using
316    /// `filter`. `None` will retrieve related events of any type.
317    ///
318    /// The related events are sorted like this:
319    ///
320    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
321    ///   located at the beginning of the array.
322    /// - events present in the linked chunk (be it in memory or in the storage)
323    ///   will be sorted according to their ordering in the linked chunk.
324    pub async fn find_event_relations(
325        &self,
326        event_id: &EventId,
327        filter: Option<Vec<RelationType>>,
328    ) -> Result<Vec<Event>> {
329        // Search in all loaded or stored events.
330        self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
331    }
332
333    /// Clear all the storage for this [`RoomEventCache`].
334    ///
335    /// This will get rid of all the events from the linked chunk and persisted
336    /// storage.
337    pub async fn clear(&self) -> Result<()> {
338        // Clear the linked chunk and persisted storage.
339        let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
340
341        // Notify observers about the update.
342        self.inner.update_sender.send(
343            RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
344                diffs: updates_as_vector_diffs,
345                origin: EventsOrigin::Cache,
346            }),
347            Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
348        );
349
350        Ok(())
351    }
352
353    /// Return a reference to the state.
354    pub(in super::super) fn state(&self) -> &LockedRoomEventCacheState {
355        &self.inner.state
356    }
357
358    /// Handle a [`JoinedRoomUpdate`].
359    #[instrument(skip_all, fields(room_id = %self.room_id()))]
360    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
361        self.inner
362            .handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes)
363            .await?;
364        self.inner.handle_account_data(updates.account_data);
365
366        Ok(())
367    }
368
369    /// Handle a [`LeftRoomUpdate`].
370    #[instrument(skip_all, fields(room_id = %self.room_id()))]
371    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
372        self.inner.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
373
374        Ok(())
375    }
376
377    /// Get a reference to the [`RoomEventCacheUpdateSender`].
378    pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
379        &self.inner.update_sender
380    }
381
382    /// Handle a single event from the `SendQueue`.
383    pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
384        self.inner.insert_sent_event_from_send_queue(event).await
385    }
386
387    /// Save some events in the event cache, for further retrieval with
388    /// [`Self::event`].
389    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
390        match self.inner.state.write().await {
391            Ok(mut state_guard) => {
392                if let Err(err) = state_guard.save_events(events).await {
393                    warn!("couldn't save event in the event cache: {err}");
394                }
395            }
396
397            Err(err) => {
398                warn!("couldn't save event in the event cache: {err}");
399            }
400        }
401    }
402
403    /// Return a nice debug string (a vector of lines) for the linked chunk of
404    /// events for this room.
405    pub async fn debug_string(&self) -> Vec<String> {
406        match self.inner.state.read().await {
407            Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
408            Err(err) => {
409                warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
410
411                vec![]
412            }
413        }
414    }
415}
416
417/// The (non-cloneable) details of the `RoomEventCache`.
418pub(super) struct RoomEventCacheInner {
419    /// The room id for this room.
420    room_id: OwnedRoomId,
421
422    pub weak_room: WeakRoom,
423
424    /// State for this room's event cache.
425    pub state: LockedRoomEventCacheState,
426
427    /// A notifier that we received a new pagination token.
428    pub pagination_batch_token_notifier: Notify,
429
430    pub shared_pagination_status: SharedObservable<SharedPaginationStatus>,
431
432    /// Sender to the auto-shrink channel.
433    ///
434    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
435    /// more details.
436    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
437
438    /// Update sender for this room.
439    update_sender: RoomEventCacheUpdateSender,
440}
441
442impl RoomEventCacheInner {
443    /// Creates a new cache for a room, and subscribes to room updates, so as
444    /// to handle new timeline events.
445    fn new(
446        room_id: OwnedRoomId,
447        weak_room: WeakRoom,
448        state: LockedRoomEventCacheState,
449        shared_pagination_status: SharedObservable<SharedPaginationStatus>,
450        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
451        update_sender: RoomEventCacheUpdateSender,
452    ) -> Self {
453        Self {
454            room_id,
455            weak_room,
456            state,
457            update_sender,
458            pagination_batch_token_notifier: Default::default(),
459            auto_shrink_sender,
460            shared_pagination_status,
461        }
462    }
463
464    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
465        if account_data.is_empty() {
466            return;
467        }
468
469        let mut handled_read_marker = false;
470
471        trace!("Handling account data");
472
473        for raw_event in account_data {
474            match raw_event.deserialize() {
475                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
476                    // If duplicated, do not forward read marker multiple times
477                    // to avoid clutter the update channel.
478                    if handled_read_marker {
479                        continue;
480                    }
481
482                    handled_read_marker = true;
483
484                    // Propagate to observers. (We ignore the error if there aren't any.)
485                    self.update_sender.send(
486                        RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
487                        None,
488                    );
489                }
490
491                Ok(_) => {
492                    // We're not interested in other room account data updates,
493                    // at this point.
494                }
495
496                Err(e) => {
497                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
498                    warn!(event_type, "Failed to deserialize account data: {e}");
499                }
500            }
501        }
502    }
503
504    /// Handle a [`Timeline`], i.e. new events received by a sync for this
505    /// room.
506    async fn handle_timeline(
507        &self,
508        timeline: Timeline,
509        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
510        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
511    ) -> Result<()> {
512        self.handle_timeline_inner(
513            self.state.write().await?,
514            timeline,
515            ephemeral_events,
516            ambiguity_changes,
517        )
518        .await
519    }
520
521    /// Handle a single event from the `SendQueue`.
522    ///
523    /// The event is inserted if and only if the cache is not empty.
524    async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
525        let state = self.state.write().await?;
526
527        // Insert the event if the room is not empty, otherwise it can break the
528        // pagination logic when detecting the start of the timeline because no gap can
529        // be inserted properly: it is impossible to compute a `prev_batch` token here.
530        if state.room_linked_chunk().events().next().is_some() {
531            return self
532                .handle_timeline_inner(
533                    state,
534                    Timeline { limited: false, prev_batch: None, events: vec![event] },
535                    Vec::new(),
536                    BTreeMap::new(),
537                )
538                .await;
539        }
540
541        Ok(())
542    }
543
544    async fn handle_timeline_inner(
545        &self,
546        mut state: RoomEventCacheStateLockWriteGuard<'_>,
547        timeline: Timeline,
548        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
549        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
550    ) -> Result<()> {
551        if timeline.events.is_empty()
552            && timeline.prev_batch.is_none()
553            && ephemeral_events.is_empty()
554            && ambiguity_changes.is_empty()
555        {
556            return Ok(());
557        }
558
559        // Add all the events to the backend.
560        trace!("adding new events");
561
562        let (stored_prev_batch_token, timeline_event_diffs) =
563            state.handle_sync(timeline, &ephemeral_events).await?;
564
565        drop(state);
566
567        // Now that all events have been added, we can trigger the
568        // `pagination_token_notifier`.
569        if stored_prev_batch_token {
570            self.pagination_batch_token_notifier.notify_one();
571        }
572
573        // The order matters here: first send the timeline event diffs, then only the
574        // related events (read receipts, etc.).
575        if !timeline_event_diffs.is_empty() {
576            self.update_sender.send(
577                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
578                    diffs: timeline_event_diffs,
579                    origin: EventsOrigin::Sync,
580                }),
581                Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
582            );
583        }
584
585        if !ephemeral_events.is_empty() {
586            self.update_sender
587                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
588        }
589
590        if !ambiguity_changes.is_empty() {
591            self.update_sender
592                .send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }, None);
593        }
594
595        Ok(())
596    }
597}
598
599#[derive(Clone, Copy)]
600pub(in super::super) enum PostProcessingOrigin {
601    Sync,
602    Backpagination,
603    #[cfg(feature = "e2e-encryption")]
604    Redecryption,
605}
606
607#[cfg(test)]
608mod tests {
609    use matrix_sdk_base::{RoomState, event_cache::Event};
610    use matrix_sdk_test::{async_test, event_factory::EventFactory};
611    use ruma::{
612        RoomId, event_id,
613        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
614        room_id, user_id,
615    };
616
617    use crate::test_utils::logged_in_client;
618
619    #[async_test]
620    async fn test_find_event_by_id_with_edit_relation() {
621        let original_id = event_id!("$original");
622        let related_id = event_id!("$related");
623        let room_id = room_id!("!galette:saucisse.bzh");
624        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
625
626        assert_relations(
627            room_id,
628            f.text_msg("Original event").event_id(original_id).into(),
629            f.text_msg("* An edited event")
630                .edit(
631                    original_id,
632                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
633                )
634                .event_id(related_id)
635                .into(),
636            f,
637        )
638        .await;
639    }
640
641    #[async_test]
642    async fn test_find_event_by_id_with_thread_reply_relation() {
643        let original_id = event_id!("$original");
644        let related_id = event_id!("$related");
645        let room_id = room_id!("!galette:saucisse.bzh");
646        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
647
648        assert_relations(
649            room_id,
650            f.text_msg("Original event").event_id(original_id).into(),
651            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
652            f,
653        )
654        .await;
655    }
656
657    #[async_test]
658    async fn test_find_event_by_id_with_reaction_relation() {
659        let original_id = event_id!("$original");
660        let related_id = event_id!("$related");
661        let room_id = room_id!("!galette:saucisse.bzh");
662        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
663
664        assert_relations(
665            room_id,
666            f.text_msg("Original event").event_id(original_id).into(),
667            f.reaction(original_id, ":D").event_id(related_id).into(),
668            f,
669        )
670        .await;
671    }
672
673    #[async_test]
674    async fn test_find_event_by_id_with_poll_response_relation() {
675        let original_id = event_id!("$original");
676        let related_id = event_id!("$related");
677        let room_id = room_id!("!galette:saucisse.bzh");
678        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
679
680        assert_relations(
681            room_id,
682            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
683                .event_id(original_id)
684                .into(),
685            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
686            f,
687        )
688        .await;
689    }
690
691    #[async_test]
692    async fn test_find_event_by_id_with_poll_end_relation() {
693        let original_id = event_id!("$original");
694        let related_id = event_id!("$related");
695        let room_id = room_id!("!galette:saucisse.bzh");
696        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
697
698        assert_relations(
699            room_id,
700            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
701                .event_id(original_id)
702                .into(),
703            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
704            f,
705        )
706        .await;
707    }
708
709    #[async_test]
710    async fn test_find_event_by_id_with_filtered_relationships() {
711        let original_id = event_id!("$original");
712        let related_id = event_id!("$related");
713        let associated_related_id = event_id!("$recursive_related");
714        let room_id = room_id!("!galette:saucisse.bzh");
715        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
716
717        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
718        let related_event = event_factory
719            .text_msg("* Edited event")
720            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
721            .event_id(related_id)
722            .into();
723        let associated_related_event =
724            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
725
726        let client = logged_in_client(None).await;
727
728        let event_cache = client.event_cache();
729        event_cache.subscribe().unwrap();
730
731        client.base_client().get_or_create_room(room_id, RoomState::Joined);
732        let room = client.get_room(room_id).unwrap();
733
734        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
735
736        // Save the original event.
737        room_event_cache.save_events([original_event]).await;
738
739        // Save the related event.
740        room_event_cache.save_events([related_event]).await;
741
742        // Save the associated related event, which redacts the related event.
743        room_event_cache.save_events([associated_related_event]).await;
744
745        let filter = Some(vec![RelationType::Replacement]);
746        let (event, related_events) = room_event_cache
747            .find_event_with_relations(original_id, filter)
748            .await
749            .expect("Failed to find the event with relations")
750            .expect("Event has no relation");
751        // Fetched event is the right one.
752        let cached_event_id = event.event_id().unwrap();
753        assert_eq!(cached_event_id, original_id);
754
755        // There's only the edit event (an edit event can't have its own edit event).
756        assert_eq!(related_events.len(), 1);
757
758        let related_event_id = related_events[0].event_id().unwrap();
759        assert_eq!(related_event_id, related_id);
760
761        // Now we'll filter threads instead, there should be no related events
762        let filter = Some(vec![RelationType::Thread]);
763        let (event, related_events) = room_event_cache
764            .find_event_with_relations(original_id, filter)
765            .await
766            .expect("Failed to find the event with relations")
767            .expect("Event has no relation");
768
769        // Fetched event is the right one.
770        let cached_event_id = event.event_id().unwrap();
771        assert_eq!(cached_event_id, original_id);
772        // No Thread related events found
773        assert!(related_events.is_empty());
774    }
775
776    #[async_test]
777    async fn test_find_event_by_id_with_recursive_relation() {
778        let original_id = event_id!("$original");
779        let related_id = event_id!("$related");
780        let associated_related_id = event_id!("$recursive_related");
781        let room_id = room_id!("!galette:saucisse.bzh");
782        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
783
784        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
785        let related_event = event_factory
786            .text_msg("* Edited event")
787            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
788            .event_id(related_id)
789            .into();
790        let associated_related_event =
791            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
792
793        let client = logged_in_client(None).await;
794
795        let event_cache = client.event_cache();
796        event_cache.subscribe().unwrap();
797
798        client.base_client().get_or_create_room(room_id, RoomState::Joined);
799        let room = client.get_room(room_id).unwrap();
800
801        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
802
803        // Save the original event.
804        room_event_cache.save_events([original_event]).await;
805
806        // Save the related event.
807        room_event_cache.save_events([related_event]).await;
808
809        // Save the associated related event, which redacts the related event.
810        room_event_cache.save_events([associated_related_event]).await;
811
812        let (event, related_events) = room_event_cache
813            .find_event_with_relations(original_id, None)
814            .await
815            .expect("Failed to find the event with relations")
816            .expect("Event has no relation");
817        // Fetched event is the right one.
818        let cached_event_id = event.event_id().unwrap();
819        assert_eq!(cached_event_id, original_id);
820
821        // There are both the related id and the associatively related id
822        assert_eq!(related_events.len(), 2);
823
824        let related_event_id = related_events[0].event_id().unwrap();
825        assert_eq!(related_event_id, related_id);
826        let related_event_id = related_events[1].event_id().unwrap();
827        assert_eq!(related_event_id, associated_related_id);
828    }
829
830    async fn assert_relations(
831        room_id: &RoomId,
832        original_event: Event,
833        related_event: Event,
834        event_factory: EventFactory,
835    ) {
836        let client = logged_in_client(None).await;
837
838        let event_cache = client.event_cache();
839        event_cache.subscribe().unwrap();
840
841        client.base_client().get_or_create_room(room_id, RoomState::Joined);
842        let room = client.get_room(room_id).unwrap();
843
844        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
845
846        // Save the original event.
847        let original_event_id = original_event.event_id().unwrap();
848        room_event_cache.save_events([original_event]).await;
849
850        // Save an unrelated event to check it's not in the related events list.
851        let unrelated_id = event_id!("$2");
852        room_event_cache
853            .save_events([event_factory
854                .text_msg("An unrelated event")
855                .event_id(unrelated_id)
856                .into()])
857            .await;
858
859        // Save the related event.
860        let related_id = related_event.event_id().unwrap();
861        room_event_cache.save_events([related_event]).await;
862
863        let (event, related_events) = room_event_cache
864            .find_event_with_relations(&original_event_id, None)
865            .await
866            .expect("Failed to find the event with relations")
867            .expect("Event has no relation");
868        // Fetched event is the right one.
869        let cached_event_id = event.event_id().unwrap();
870        assert_eq!(cached_event_id, original_event_id);
871
872        // There is only the actually related event in the related ones
873        let related_event_id = related_events[0].event_id().unwrap();
874        assert_eq!(related_event_id, related_id);
875    }
876}
877
878#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
879mod timed_tests {
880    use std::{ops::Not, sync::Arc};
881
882    use assert_matches::assert_matches;
883    use assert_matches2::assert_let;
884    use eyeball_im::VectorDiff;
885    use futures_util::FutureExt;
886    use matrix_sdk_base::{
887        RoomState,
888        event_cache::{
889            Gap,
890            store::{EventCacheStore as _, MemoryStore},
891        },
892        linked_chunk::{
893            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
894            lazy_loader::from_all_chunks,
895        },
896        store::StoreConfig,
897        sync::{JoinedRoomUpdate, Timeline},
898    };
899    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
900    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
901    use ruma::{
902        EventId, event_id,
903        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
904        room_id,
905        serde::Raw,
906        user_id,
907    };
908    use serde_json::json;
909    use tokio::task::yield_now;
910
911    use super::{
912        super::{
913            super::TimelineVectorDiffs, lock::Reload as _,
914            pagination::LoadMoreEventsBackwardsOutcome,
915        },
916        RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
917    };
918    use crate::{assert_let_timeout, test_utils::client::MockClientBuilder};
919
920    #[async_test]
921    async fn test_write_to_storage() {
922        let room_id = room_id!("!galette:saucisse.bzh");
923        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
924
925        let event_cache_store = Arc::new(MemoryStore::new());
926
927        let client = MockClientBuilder::new(None)
928            .on_builder(|builder| {
929                builder.store_config(
930                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
931                        .event_cache_store(event_cache_store.clone()),
932                )
933            })
934            .build()
935            .await;
936
937        let event_cache = client.event_cache();
938
939        // Don't forget to subscribe and like.
940        event_cache.subscribe().unwrap();
941
942        client.base_client().get_or_create_room(room_id, RoomState::Joined);
943        let room = client.get_room(room_id).unwrap();
944
945        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
946        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
947
948        // Propagate an update for a message and a prev-batch token.
949        let timeline = Timeline {
950            limited: true,
951            prev_batch: Some("raclette".to_owned()),
952            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
953        };
954
955        room_event_cache
956            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
957            .await
958            .unwrap();
959
960        // Just checking the generic update is correct.
961        assert_matches!(
962            generic_stream.recv().await,
963            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
964                assert_eq!(expected_room_id, room_id);
965            }
966        );
967        assert!(generic_stream.is_empty());
968
969        // Check the storage.
970        let linked_chunk = from_all_chunks::<3, _, _>(
971            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
972        )
973        .unwrap()
974        .unwrap();
975
976        assert_eq!(linked_chunk.chunks().count(), 2);
977
978        let mut chunks = linked_chunk.chunks();
979
980        // We start with the gap.
981        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
982            assert_eq!(gap.token, "raclette");
983        });
984
985        // Then we have the stored event.
986        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
987            assert_eq!(events.len(), 1);
988            let deserialized = events[0].raw().deserialize().unwrap();
989            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
990            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
991        });
992
993        // That's all, folks!
994        assert!(chunks.next().is_none());
995    }
996
997    #[async_test]
998    async fn test_write_to_storage_strips_bundled_relations() {
999        let room_id = room_id!("!galette:saucisse.bzh");
1000        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1001
1002        let event_cache_store = Arc::new(MemoryStore::new());
1003
1004        let client = MockClientBuilder::new(None)
1005            .on_builder(|builder| {
1006                builder.store_config(
1007                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1008                        .event_cache_store(event_cache_store.clone()),
1009                )
1010            })
1011            .build()
1012            .await;
1013
1014        let event_cache = client.event_cache();
1015
1016        // Don't forget to subscribe and like.
1017        event_cache.subscribe().unwrap();
1018
1019        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1020        let room = client.get_room(room_id).unwrap();
1021
1022        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1023        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1024
1025        // Propagate an update for a message with bundled relations.
1026        let ev = f
1027            .text_msg("hey yo")
1028            .sender(*ALICE)
1029            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
1030            .into_event();
1031
1032        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1033
1034        room_event_cache
1035            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1036            .await
1037            .unwrap();
1038
1039        // Just checking the generic update is correct.
1040        assert_matches!(
1041            generic_stream.recv().await,
1042            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1043                assert_eq!(expected_room_id, room_id);
1044            }
1045        );
1046        assert!(generic_stream.is_empty());
1047
1048        // The in-memory linked chunk keeps the bundled relation.
1049        {
1050            let events = room_event_cache.events().await.unwrap();
1051
1052            assert_eq!(events.len(), 1);
1053
1054            let ev = events[0].raw().deserialize().unwrap();
1055            assert_let!(
1056                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1057            );
1058
1059            let original = msg.as_original().unwrap();
1060            assert_eq!(original.content.body(), "hey yo");
1061            assert!(original.unsigned.relations.replace.is_some());
1062        }
1063
1064        // The one in storage does not.
1065        let linked_chunk = from_all_chunks::<3, _, _>(
1066            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1067        )
1068        .unwrap()
1069        .unwrap();
1070
1071        assert_eq!(linked_chunk.chunks().count(), 1);
1072
1073        let mut chunks = linked_chunk.chunks();
1074        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1075            assert_eq!(events.len(), 1);
1076
1077            let ev = events[0].raw().deserialize().unwrap();
1078            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1079
1080            let original = msg.as_original().unwrap();
1081            assert_eq!(original.content.body(), "hey yo");
1082            assert!(original.unsigned.relations.replace.is_none());
1083        });
1084
1085        // That's all, folks!
1086        assert!(chunks.next().is_none());
1087    }
1088
1089    #[async_test]
1090    async fn test_clear() {
1091        let room_id = room_id!("!galette:saucisse.bzh");
1092        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1093
1094        let event_cache_store = Arc::new(MemoryStore::new());
1095
1096        let event_id1 = event_id!("$1");
1097        let event_id2 = event_id!("$2");
1098
1099        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1100        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1101
1102        // Prefill the store with some data.
1103        event_cache_store
1104            .handle_linked_chunk_updates(
1105                LinkedChunkId::Room(room_id),
1106                vec![
1107                    // An empty items chunk.
1108                    Update::NewItemsChunk {
1109                        previous: None,
1110                        new: ChunkIdentifier::new(0),
1111                        next: None,
1112                    },
1113                    // A gap chunk.
1114                    Update::NewGapChunk {
1115                        previous: Some(ChunkIdentifier::new(0)),
1116                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1117                        new: ChunkIdentifier::new(42),
1118                        next: None,
1119                        gap: Gap { token: "comté".to_owned() },
1120                    },
1121                    // Another items chunk, non-empty this time.
1122                    Update::NewItemsChunk {
1123                        previous: Some(ChunkIdentifier::new(42)),
1124                        new: ChunkIdentifier::new(1),
1125                        next: None,
1126                    },
1127                    Update::PushItems {
1128                        at: Position::new(ChunkIdentifier::new(1), 0),
1129                        items: vec![ev1.clone()],
1130                    },
1131                    // And another items chunk, non-empty again.
1132                    Update::NewItemsChunk {
1133                        previous: Some(ChunkIdentifier::new(1)),
1134                        new: ChunkIdentifier::new(2),
1135                        next: None,
1136                    },
1137                    Update::PushItems {
1138                        at: Position::new(ChunkIdentifier::new(2), 0),
1139                        items: vec![ev2.clone()],
1140                    },
1141                ],
1142            )
1143            .await
1144            .unwrap();
1145
1146        let client = MockClientBuilder::new(None)
1147            .on_builder(|builder| {
1148                builder.store_config(
1149                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1150                        .event_cache_store(event_cache_store.clone()),
1151                )
1152            })
1153            .build()
1154            .await;
1155
1156        let event_cache = client.event_cache();
1157
1158        // Don't forget to subscribe and like.
1159        event_cache.subscribe().unwrap();
1160
1161        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1162        let room = client.get_room(room_id).unwrap();
1163
1164        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1165
1166        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1167        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1168
1169        // The rooms knows about all cached events.
1170        {
1171            assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1172            assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1173        }
1174
1175        // But only part of events are loaded from the store
1176        {
1177            // The room must contain only one event because only one chunk has been loaded.
1178            assert_eq!(items.len(), 1);
1179            assert_eq!(items[0].event_id().unwrap(), event_id2);
1180
1181            assert!(stream.is_empty());
1182        }
1183
1184        // Let's load more chunks to load all events.
1185        {
1186            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1187
1188            assert_let_timeout!(
1189                Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1190                    stream.recv()
1191            );
1192            assert_eq!(diffs.len(), 1);
1193            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1194                // Here you are `event_id1`!
1195                assert_eq!(event.event_id().unwrap(), event_id1);
1196            });
1197
1198            assert!(stream.is_empty());
1199
1200            assert_let_timeout!(
1201                Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1202                    generic_stream.recv()
1203            );
1204            assert_eq!(room_id, expected_room_id);
1205            assert!(generic_stream.is_empty());
1206        }
1207
1208        // After clearing,…
1209        room_event_cache.clear().await.unwrap();
1210
1211        //… we get an update that the content has been cleared.
1212        assert_let_timeout!(
1213            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1214                stream.recv()
1215        );
1216        assert_eq!(diffs.len(), 1);
1217        assert_let!(VectorDiff::Clear = &diffs[0]);
1218
1219        // … same with a generic update.
1220        assert_let_timeout!(
1221            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1222        );
1223        assert_eq!(received_room_id, room_id);
1224        assert!(generic_stream.is_empty());
1225
1226        // Events individually are not forgotten by the event cache, after clearing a
1227        // room.
1228        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1229
1230        // But their presence in a linked chunk is forgotten.
1231        let items = room_event_cache.events().await.unwrap();
1232        assert!(items.is_empty());
1233
1234        // The event cache store too.
1235        let linked_chunk = from_all_chunks::<3, _, _>(
1236            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1237        )
1238        .unwrap()
1239        .unwrap();
1240
1241        // Note: while the event cache store could return `None` here, clearing it will
1242        // reset it to its initial form, maintaining the invariant that it
1243        // contains a single items chunk that's empty.
1244        assert_eq!(linked_chunk.num_items(), 0);
1245    }
1246
1247    #[async_test]
1248    async fn test_load_from_storage() {
1249        let room_id = room_id!("!galette:saucisse.bzh");
1250        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1251
1252        let event_cache_store = Arc::new(MemoryStore::new());
1253
1254        let event_id1 = event_id!("$1");
1255        let event_id2 = event_id!("$2");
1256
1257        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1258        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1259
1260        // Prefill the store with some data.
1261        event_cache_store
1262            .handle_linked_chunk_updates(
1263                LinkedChunkId::Room(room_id),
1264                vec![
1265                    // An empty items chunk.
1266                    Update::NewItemsChunk {
1267                        previous: None,
1268                        new: ChunkIdentifier::new(0),
1269                        next: None,
1270                    },
1271                    // A gap chunk.
1272                    Update::NewGapChunk {
1273                        previous: Some(ChunkIdentifier::new(0)),
1274                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1275                        new: ChunkIdentifier::new(42),
1276                        next: None,
1277                        gap: Gap { token: "cheddar".to_owned() },
1278                    },
1279                    // Another items chunk, non-empty this time.
1280                    Update::NewItemsChunk {
1281                        previous: Some(ChunkIdentifier::new(42)),
1282                        new: ChunkIdentifier::new(1),
1283                        next: None,
1284                    },
1285                    Update::PushItems {
1286                        at: Position::new(ChunkIdentifier::new(1), 0),
1287                        items: vec![ev1.clone()],
1288                    },
1289                    // And another items chunk, non-empty again.
1290                    Update::NewItemsChunk {
1291                        previous: Some(ChunkIdentifier::new(1)),
1292                        new: ChunkIdentifier::new(2),
1293                        next: None,
1294                    },
1295                    Update::PushItems {
1296                        at: Position::new(ChunkIdentifier::new(2), 0),
1297                        items: vec![ev2.clone()],
1298                    },
1299                ],
1300            )
1301            .await
1302            .unwrap();
1303
1304        let client = MockClientBuilder::new(None)
1305            .on_builder(|builder| {
1306                builder.store_config(
1307                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1308                        .event_cache_store(event_cache_store.clone()),
1309                )
1310            })
1311            .build()
1312            .await;
1313
1314        let event_cache = client.event_cache();
1315
1316        // Don't forget to subscribe and like.
1317        event_cache.subscribe().unwrap();
1318
1319        // Let's check whether the generic updates are received for the initialisation.
1320        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1321
1322        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1323        let room = client.get_room(room_id).unwrap();
1324
1325        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1326
1327        // The room event cache has been loaded. A generic update must have been
1328        // triggered.
1329        assert_matches!(
1330            generic_stream.recv().await,
1331            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1332                assert_eq!(room_id, expected_room_id);
1333            }
1334        );
1335        assert!(generic_stream.is_empty());
1336
1337        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1338
1339        // The initial items contain one event because only the last chunk is loaded by
1340        // default.
1341        assert_eq!(items.len(), 1);
1342        assert_eq!(items[0].event_id().unwrap(), event_id2);
1343        assert!(stream.is_empty());
1344
1345        // The event cache knows only all events though, even if they aren't loaded.
1346        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1347        assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1348
1349        // Let's paginate to load more events.
1350        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1351
1352        assert_let_timeout!(
1353            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1354                stream.recv()
1355        );
1356        assert_eq!(diffs.len(), 1);
1357        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1358            assert_eq!(event.event_id().unwrap(), event_id1);
1359        });
1360
1361        assert!(stream.is_empty());
1362
1363        // A generic update is triggered too.
1364        assert_matches!(
1365            generic_stream.recv().await,
1366            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1367                assert_eq!(expected_room_id, room_id);
1368            }
1369        );
1370        assert!(generic_stream.is_empty());
1371
1372        // A new update with one of these events leads to deduplication.
1373        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1374
1375        room_event_cache
1376            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1377            .await
1378            .unwrap();
1379
1380        // Just checking the generic update is correct. There is a duplicate event, so
1381        // no generic changes whatsoever!
1382        assert!(generic_stream.recv().now_or_never().is_none());
1383
1384        // The stream doesn't report these changes *yet*. Use the items vector given
1385        // when subscribing, to check that the items correspond to their new
1386        // positions. The duplicated item is removed (so it's not the first
1387        // element anymore), and it's added to the back of the list.
1388        let items = room_event_cache.events().await.unwrap();
1389        assert_eq!(items.len(), 2);
1390        assert_eq!(items[0].event_id().unwrap(), event_id1);
1391        assert_eq!(items[1].event_id().unwrap(), event_id2);
1392    }
1393
1394    #[async_test]
1395    async fn test_load_from_storage_resilient_to_failure() {
1396        let room_id = room_id!("!fondue:patate.ch");
1397        let event_cache_store = Arc::new(MemoryStore::new());
1398
1399        let event = EventFactory::new()
1400            .room(room_id)
1401            .sender(user_id!("@ben:saucisse.bzh"))
1402            .text_msg("foo")
1403            .event_id(event_id!("$42"))
1404            .into_event();
1405
1406        // Prefill the store with invalid data: two chunks that form a cycle.
1407        event_cache_store
1408            .handle_linked_chunk_updates(
1409                LinkedChunkId::Room(room_id),
1410                vec![
1411                    Update::NewItemsChunk {
1412                        previous: None,
1413                        new: ChunkIdentifier::new(0),
1414                        next: None,
1415                    },
1416                    Update::PushItems {
1417                        at: Position::new(ChunkIdentifier::new(0), 0),
1418                        items: vec![event],
1419                    },
1420                    Update::NewItemsChunk {
1421                        previous: Some(ChunkIdentifier::new(0)),
1422                        new: ChunkIdentifier::new(1),
1423                        next: Some(ChunkIdentifier::new(0)),
1424                    },
1425                ],
1426            )
1427            .await
1428            .unwrap();
1429
1430        let client = MockClientBuilder::new(None)
1431            .on_builder(|builder| {
1432                builder.store_config(
1433                    StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1434                        .event_cache_store(event_cache_store.clone()),
1435                )
1436            })
1437            .build()
1438            .await;
1439
1440        let event_cache = client.event_cache();
1441
1442        // Don't forget to subscribe and like.
1443        event_cache.subscribe().unwrap();
1444
1445        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1446        let room = client.get_room(room_id).unwrap();
1447
1448        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1449
1450        let items = room_event_cache.events().await.unwrap();
1451
1452        // Because the persisted content was invalid, the room store is reset: there are
1453        // no events in the cache.
1454        assert!(items.is_empty());
1455
1456        // Storage doesn't contain anything. It would also be valid that it contains a
1457        // single initial empty items chunk.
1458        let raw_chunks =
1459            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1460        assert!(raw_chunks.is_empty());
1461    }
1462
1463    #[async_test]
1464    async fn test_no_useless_gaps() {
1465        let room_id = room_id!("!galette:saucisse.bzh");
1466
1467        let client = MockClientBuilder::new(None).build().await;
1468
1469        let event_cache = client.event_cache();
1470        event_cache.subscribe().unwrap();
1471
1472        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1473        let room = client.get_room(room_id).unwrap();
1474        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1475        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1476
1477        let f = EventFactory::new().room(room_id).sender(*ALICE);
1478
1479        // Propagate an update including a limited timeline with one message and a
1480        // prev-batch token.
1481        room_event_cache
1482            .handle_joined_room_update(JoinedRoomUpdate {
1483                timeline: Timeline {
1484                    limited: true,
1485                    prev_batch: Some("raclette".to_owned()),
1486                    events: vec![f.text_msg("hey yo").into_event()],
1487                },
1488                ..Default::default()
1489            })
1490            .await
1491            .unwrap();
1492
1493        // Just checking the generic update is correct.
1494        assert_matches!(
1495            generic_stream.recv().await,
1496            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1497                assert_eq!(expected_room_id, room_id);
1498            }
1499        );
1500        assert!(generic_stream.is_empty());
1501
1502        {
1503            let state = room_event_cache.inner.state.read().await.unwrap();
1504
1505            let mut num_gaps = 0;
1506            let mut num_events = 0;
1507
1508            for c in state.room_linked_chunk().chunks() {
1509                match c.content() {
1510                    ChunkContent::Items(items) => num_events += items.len(),
1511                    ChunkContent::Gap(_) => num_gaps += 1,
1512                }
1513            }
1514
1515            // The limited sync unloads the chunk, so it will appear as if there are only
1516            // the events.
1517            assert_eq!(num_gaps, 0);
1518            assert_eq!(num_events, 1);
1519        }
1520
1521        // But if I manually reload more of the chunk, the gap will be present.
1522        assert_matches!(
1523            room_event_cache.pagination().load_more_events_backwards().await.unwrap(),
1524            LoadMoreEventsBackwardsOutcome::Gap { .. }
1525        );
1526
1527        {
1528            let state = room_event_cache.inner.state.read().await.unwrap();
1529
1530            let mut num_gaps = 0;
1531            let mut num_events = 0;
1532
1533            for c in state.room_linked_chunk().chunks() {
1534                match c.content() {
1535                    ChunkContent::Items(items) => num_events += items.len(),
1536                    ChunkContent::Gap(_) => num_gaps += 1,
1537                }
1538            }
1539
1540            // The gap must have been stored.
1541            assert_eq!(num_gaps, 1);
1542            assert_eq!(num_events, 1);
1543        }
1544
1545        // Now, propagate an update for another message, but the timeline isn't limited
1546        // this time.
1547        room_event_cache
1548            .handle_joined_room_update(JoinedRoomUpdate {
1549                timeline: Timeline {
1550                    limited: false,
1551                    prev_batch: Some("fondue".to_owned()),
1552                    events: vec![f.text_msg("sup").into_event()],
1553                },
1554                ..Default::default()
1555            })
1556            .await
1557            .unwrap();
1558
1559        // Just checking the generic update is correct.
1560        assert_matches!(
1561            generic_stream.recv().await,
1562            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1563                assert_eq!(expected_room_id, room_id);
1564            }
1565        );
1566        assert!(generic_stream.is_empty());
1567
1568        {
1569            let state = room_event_cache.inner.state.read().await.unwrap();
1570
1571            let mut num_gaps = 0;
1572            let mut num_events = 0;
1573
1574            for c in state.room_linked_chunk().chunks() {
1575                match c.content() {
1576                    ChunkContent::Items(items) => num_events += items.len(),
1577                    ChunkContent::Gap(gap) => {
1578                        assert_eq!(gap.token, "raclette");
1579                        num_gaps += 1;
1580                    }
1581                }
1582            }
1583
1584            // There's only the previous gap, no new ones.
1585            assert_eq!(num_gaps, 1);
1586            assert_eq!(num_events, 2);
1587        }
1588    }
1589
1590    #[async_test]
1591    async fn test_shrink_to_last_chunk() {
1592        let room_id = room_id!("!galette:saucisse.bzh");
1593
1594        let client = MockClientBuilder::new(None).build().await;
1595
1596        let f = EventFactory::new().room(room_id);
1597
1598        let evid1 = event_id!("$1");
1599        let evid2 = event_id!("$2");
1600
1601        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1602        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1603
1604        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1605        {
1606            client
1607                .event_cache_store()
1608                .lock()
1609                .await
1610                .expect("Could not acquire the event cache lock")
1611                .as_clean()
1612                .expect("Could not acquire a clean event cache lock")
1613                .handle_linked_chunk_updates(
1614                    LinkedChunkId::Room(room_id),
1615                    vec![
1616                        Update::NewItemsChunk {
1617                            previous: None,
1618                            new: ChunkIdentifier::new(0),
1619                            next: None,
1620                        },
1621                        Update::PushItems {
1622                            at: Position::new(ChunkIdentifier::new(0), 0),
1623                            items: vec![ev1],
1624                        },
1625                        Update::NewItemsChunk {
1626                            previous: Some(ChunkIdentifier::new(0)),
1627                            new: ChunkIdentifier::new(1),
1628                            next: None,
1629                        },
1630                        Update::PushItems {
1631                            at: Position::new(ChunkIdentifier::new(1), 0),
1632                            items: vec![ev2],
1633                        },
1634                    ],
1635                )
1636                .await
1637                .unwrap();
1638        }
1639
1640        let event_cache = client.event_cache();
1641        event_cache.subscribe().unwrap();
1642
1643        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1644        let room = client.get_room(room_id).unwrap();
1645        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1646
1647        // Sanity check: lazily loaded, so only includes one item at start.
1648        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1649        assert_eq!(events.len(), 1);
1650        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1651        assert!(stream.is_empty());
1652
1653        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1654
1655        // Force loading the full linked chunk by back-paginating.
1656        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1657        assert_eq!(outcome.events.len(), 1);
1658        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1659        assert!(outcome.reached_start);
1660
1661        // We also get an update about the loading from the store.
1662        assert_let_timeout!(
1663            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1664                stream.recv()
1665        );
1666        assert_eq!(diffs.len(), 1);
1667        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1668            assert_eq!(value.event_id().as_deref(), Some(evid1));
1669        });
1670
1671        assert!(stream.is_empty());
1672
1673        // Same for the generic update.
1674        assert_let_timeout!(
1675            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1676        );
1677        assert_eq!(expected_room_id, room_id);
1678        assert!(generic_stream.is_empty());
1679
1680        // Shrink the linked chunk to the last chunk.
1681        room_event_cache
1682            .inner
1683            .state
1684            .write()
1685            .await
1686            .unwrap()
1687            .reload()
1688            .await
1689            .expect("shrinking should succeed");
1690
1691        // We receive updates about the changes to the linked chunk.
1692        assert_let_timeout!(
1693            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1694                stream.recv()
1695        );
1696        assert_eq!(diffs.len(), 2);
1697        assert_matches!(&diffs[0], VectorDiff::Clear);
1698        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1699            assert_eq!(values.len(), 1);
1700            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1701        });
1702
1703        assert!(stream.is_empty());
1704
1705        // A generic update has been received.
1706        assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1707        assert!(generic_stream.is_empty());
1708
1709        // When reading the events, we do get only the last one.
1710        let events = room_event_cache.events().await.unwrap();
1711        assert_eq!(events.len(), 1);
1712        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1713
1714        // But if we back-paginate, we don't need access to network to find out about
1715        // the previous event.
1716        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1717        assert_eq!(outcome.events.len(), 1);
1718        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1719        assert!(outcome.reached_start);
1720    }
1721
1722    #[async_test]
1723    async fn test_room_ordering() {
1724        let room_id = room_id!("!galette:saucisse.bzh");
1725
1726        let client = MockClientBuilder::new(None).build().await;
1727
1728        let f = EventFactory::new().room(room_id).sender(*ALICE);
1729
1730        let evid1 = event_id!("$1");
1731        let evid2 = event_id!("$2");
1732        let evid3 = event_id!("$3");
1733
1734        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1735        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1736        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1737
1738        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1739        {
1740            client
1741                .event_cache_store()
1742                .lock()
1743                .await
1744                .expect("Could not acquire the event cache lock")
1745                .as_clean()
1746                .expect("Could not acquire a clean event cache lock")
1747                .handle_linked_chunk_updates(
1748                    LinkedChunkId::Room(room_id),
1749                    vec![
1750                        Update::NewItemsChunk {
1751                            previous: None,
1752                            new: ChunkIdentifier::new(0),
1753                            next: None,
1754                        },
1755                        Update::PushItems {
1756                            at: Position::new(ChunkIdentifier::new(0), 0),
1757                            items: vec![ev1, ev2],
1758                        },
1759                        Update::NewItemsChunk {
1760                            previous: Some(ChunkIdentifier::new(0)),
1761                            new: ChunkIdentifier::new(1),
1762                            next: None,
1763                        },
1764                        Update::PushItems {
1765                            at: Position::new(ChunkIdentifier::new(1), 0),
1766                            items: vec![ev3.clone()],
1767                        },
1768                    ],
1769                )
1770                .await
1771                .unwrap();
1772        }
1773
1774        let event_cache = client.event_cache();
1775        event_cache.subscribe().unwrap();
1776
1777        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1778        let room = client.get_room(room_id).unwrap();
1779        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1780
1781        // Initially, the linked chunk only contains the last chunk, so only ev3 is
1782        // loaded.
1783        {
1784            let state = room_event_cache.inner.state.read().await.unwrap();
1785            let room_linked_chunk = state.room_linked_chunk();
1786
1787            // But we can get the order of ev1.
1788            assert_eq!(
1789                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1790                Some(0)
1791            );
1792
1793            // And that of ev2 as well.
1794            assert_eq!(
1795                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1796                Some(1)
1797            );
1798
1799            // ev3, which is loaded, also has a known ordering.
1800            let mut events = room_linked_chunk.events();
1801            let (pos, ev) = events.next().unwrap();
1802            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1803            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1804            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1805
1806            // No other loaded events.
1807            assert!(events.next().is_none());
1808        }
1809
1810        // Force loading the full linked chunk by back-paginating.
1811        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1812        assert!(outcome.reached_start);
1813
1814        // All events are now loaded, so their order is precisely their enumerated index
1815        // in a linear iteration.
1816        {
1817            let state = room_event_cache.inner.state.read().await.unwrap();
1818            let room_linked_chunk = state.room_linked_chunk();
1819
1820            for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1821                assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1822            }
1823        }
1824
1825        // Handle a gappy sync with two events (including one duplicate, so
1826        // deduplication kicks in), so that the linked chunk is shrunk to the
1827        // last chunk, and that the linked chunk only contains the last two
1828        // events.
1829        let evid4 = event_id!("$4");
1830        room_event_cache
1831            .handle_joined_room_update(JoinedRoomUpdate {
1832                timeline: Timeline {
1833                    limited: true,
1834                    prev_batch: Some("fondue".to_owned()),
1835                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1836                },
1837                ..Default::default()
1838            })
1839            .await
1840            .unwrap();
1841
1842        {
1843            let state = room_event_cache.inner.state.read().await.unwrap();
1844            let room_linked_chunk = state.room_linked_chunk();
1845
1846            // After the shrink, only evid3 and evid4 are loaded.
1847            let mut events = room_linked_chunk.events();
1848
1849            let (pos, ev) = events.next().unwrap();
1850            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1851            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1852
1853            let (pos, ev) = events.next().unwrap();
1854            assert_eq!(ev.event_id().as_deref(), Some(evid4));
1855            assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1856
1857            // No other loaded events.
1858            assert!(events.next().is_none());
1859
1860            // But we can still get the order of previous events.
1861            assert_eq!(
1862                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1863                Some(0)
1864            );
1865            assert_eq!(
1866                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1867                Some(1)
1868            );
1869
1870            // ev3 doesn't have an order with its previous position, since it's been
1871            // deduplicated.
1872            assert_eq!(
1873                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1874                None
1875            );
1876        }
1877    }
1878
1879    #[async_test]
1880    async fn test_auto_shrink_after_all_subscribers_are_gone() {
1881        let room_id = room_id!("!galette:saucisse.bzh");
1882
1883        let client = MockClientBuilder::new(None).build().await;
1884
1885        let f = EventFactory::new().room(room_id);
1886
1887        let evid1 = event_id!("$1");
1888        let evid2 = event_id!("$2");
1889
1890        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1891        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1892
1893        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1894        {
1895            client
1896                .event_cache_store()
1897                .lock()
1898                .await
1899                .expect("Could not acquire the event cache lock")
1900                .as_clean()
1901                .expect("Could not acquire a clean event cache lock")
1902                .handle_linked_chunk_updates(
1903                    LinkedChunkId::Room(room_id),
1904                    vec![
1905                        Update::NewItemsChunk {
1906                            previous: None,
1907                            new: ChunkIdentifier::new(0),
1908                            next: None,
1909                        },
1910                        Update::PushItems {
1911                            at: Position::new(ChunkIdentifier::new(0), 0),
1912                            items: vec![ev1],
1913                        },
1914                        Update::NewItemsChunk {
1915                            previous: Some(ChunkIdentifier::new(0)),
1916                            new: ChunkIdentifier::new(1),
1917                            next: None,
1918                        },
1919                        Update::PushItems {
1920                            at: Position::new(ChunkIdentifier::new(1), 0),
1921                            items: vec![ev2],
1922                        },
1923                    ],
1924                )
1925                .await
1926                .unwrap();
1927        }
1928
1929        let event_cache = client.event_cache();
1930        event_cache.subscribe().unwrap();
1931
1932        client.base_client().get_or_create_room(room_id, RoomState::Joined);
1933        let room = client.get_room(room_id).unwrap();
1934        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1935
1936        // Sanity check: lazily loaded, so only includes one item at start.
1937        let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1938        assert_eq!(events1.len(), 1);
1939        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1940        assert!(stream1.is_empty());
1941
1942        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1943
1944        // Force loading the full linked chunk by back-paginating.
1945        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1946        assert_eq!(outcome.events.len(), 1);
1947        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1948        assert!(outcome.reached_start);
1949
1950        // We also get an update about the loading from the store. Ignore it, for this
1951        // test's sake.
1952        assert_let_timeout!(
1953            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1954                stream1.recv()
1955        );
1956        assert_eq!(diffs.len(), 1);
1957        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1958            assert_eq!(value.event_id().as_deref(), Some(evid1));
1959        });
1960
1961        assert!(stream1.is_empty());
1962
1963        assert_let_timeout!(
1964            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1965        );
1966        assert_eq!(expected_room_id, room_id);
1967        assert!(generic_stream.is_empty());
1968
1969        // Have another subscriber.
1970        // Since it's not the first one, and the previous one loaded some more events,
1971        // the second subscribers sees them all.
1972        let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1973        assert_eq!(events2.len(), 2);
1974        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1975        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1976        assert!(stream2.is_empty());
1977
1978        // Drop the first stream, and wait a bit.
1979        drop(stream1);
1980        yield_now().await;
1981
1982        // The second stream remains undisturbed.
1983        assert!(stream2.is_empty());
1984
1985        // Now drop the second stream, and wait a bit.
1986        drop(stream2);
1987        yield_now().await;
1988
1989        // The linked chunk must have auto-shrunk by now.
1990
1991        {
1992            // Check the inner state: there's no more shared auto-shrinker.
1993            let state = room_event_cache.inner.state.read().await.unwrap();
1994            assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
1995        }
1996
1997        // Getting the events will only give us the latest chunk.
1998        let events3 = room_event_cache.events().await.unwrap();
1999        assert_eq!(events3.len(), 1);
2000        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2001    }
2002
2003    #[async_test]
2004    async fn test_rfind_map_event_in_memory_by() {
2005        let user_id = user_id!("@mnt_io:matrix.org");
2006        let room_id = room_id!("!raclette:patate.ch");
2007        let client = MockClientBuilder::new(None).build().await;
2008
2009        let event_factory = EventFactory::new().room(room_id);
2010
2011        let event_id_0 = event_id!("$ev0");
2012        let event_id_1 = event_id!("$ev1");
2013        let event_id_2 = event_id!("$ev2");
2014        let event_id_3 = event_id!("$ev3");
2015
2016        let event_0 =
2017            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
2018        let event_1 =
2019            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
2020        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
2021        let event_3 =
2022            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
2023
2024        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
2025        // events.
2026        {
2027            client
2028                .event_cache_store()
2029                .lock()
2030                .await
2031                .expect("Could not acquire the event cache lock")
2032                .as_clean()
2033                .expect("Could not acquire a clean event cache lock")
2034                .handle_linked_chunk_updates(
2035                    LinkedChunkId::Room(room_id),
2036                    vec![
2037                        Update::NewItemsChunk {
2038                            previous: None,
2039                            new: ChunkIdentifier::new(0),
2040                            next: None,
2041                        },
2042                        Update::PushItems {
2043                            at: Position::new(ChunkIdentifier::new(0), 0),
2044                            items: vec![event_3],
2045                        },
2046                        Update::NewItemsChunk {
2047                            previous: Some(ChunkIdentifier::new(0)),
2048                            new: ChunkIdentifier::new(1),
2049                            next: None,
2050                        },
2051                        Update::PushItems {
2052                            at: Position::new(ChunkIdentifier::new(1), 0),
2053                            items: vec![event_0, event_1, event_2],
2054                        },
2055                    ],
2056                )
2057                .await
2058                .unwrap();
2059        }
2060
2061        let event_cache = client.event_cache();
2062        event_cache.subscribe().unwrap();
2063
2064        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2065        let room = client.get_room(room_id).unwrap();
2066        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2067
2068        // Look for an event from `BOB`: it must be `event_0`.
2069        assert_matches!(
2070            room_event_cache
2071                .rfind_map_event_in_memory_by(|event| {
2072                    (event.sender().as_deref() == Some(*BOB)).then(|| event.event_id())
2073                })
2074                .await,
2075            Ok(Some(event_id)) => {
2076                assert_eq!(event_id.as_deref(), Some(event_id_0));
2077            }
2078        );
2079
2080        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
2081        // because events are looked for in reverse order.
2082        assert_matches!(
2083            room_event_cache
2084                .rfind_map_event_in_memory_by(|event| {
2085                    (event.sender().as_deref() == Some(*ALICE)).then(|| event.event_id())
2086                })
2087                .await,
2088            Ok(Some(event_id)) => {
2089                assert_eq!(event_id.as_deref(), Some(event_id_2));
2090            }
2091        );
2092
2093        // Look for an event that is inside the storage, but not loaded.
2094        assert!(
2095            room_event_cache
2096                .rfind_map_event_in_memory_by(|event| {
2097                    (event.sender().as_deref() == Some(user_id)).then(|| event.event_id())
2098                })
2099                .await
2100                .unwrap()
2101                .is_none()
2102        );
2103
2104        // Look for an event that doesn't exist.
2105        assert!(
2106            room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2107        );
2108    }
2109
2110    #[async_test]
2111    async fn test_reload_when_dirty() {
2112        let user_id = user_id!("@mnt_io:matrix.org");
2113        let room_id = room_id!("!raclette:patate.ch");
2114
2115        // The storage shared by the two clients.
2116        let event_cache_store = MemoryStore::new();
2117
2118        // Client for the process 0.
2119        let client_p0 = MockClientBuilder::new(None)
2120            .on_builder(|builder| {
2121                builder.store_config(
2122                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2123                        .event_cache_store(event_cache_store.clone()),
2124                )
2125            })
2126            .build()
2127            .await;
2128
2129        // Client for the process 1.
2130        let client_p1 = MockClientBuilder::new(None)
2131            .on_builder(|builder| {
2132                builder.store_config(
2133                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2134                        .event_cache_store(event_cache_store),
2135                )
2136            })
2137            .build()
2138            .await;
2139
2140        let event_factory = EventFactory::new().room(room_id).sender(user_id);
2141
2142        let ev_id_0 = event_id!("$ev_0");
2143        let ev_id_1 = event_id!("$ev_1");
2144
2145        let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2146        let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2147
2148        // Add events to the storage (shared by the two clients!).
2149        client_p0
2150            .event_cache_store()
2151            .lock()
2152            .await
2153            .expect("[p0] Could not acquire the event cache lock")
2154            .as_clean()
2155            .expect("[p0] Could not acquire a clean event cache lock")
2156            .handle_linked_chunk_updates(
2157                LinkedChunkId::Room(room_id),
2158                vec![
2159                    Update::NewItemsChunk {
2160                        previous: None,
2161                        new: ChunkIdentifier::new(0),
2162                        next: None,
2163                    },
2164                    Update::PushItems {
2165                        at: Position::new(ChunkIdentifier::new(0), 0),
2166                        items: vec![ev_0],
2167                    },
2168                    Update::NewItemsChunk {
2169                        previous: Some(ChunkIdentifier::new(0)),
2170                        new: ChunkIdentifier::new(1),
2171                        next: None,
2172                    },
2173                    Update::PushItems {
2174                        at: Position::new(ChunkIdentifier::new(1), 0),
2175                        items: vec![ev_1],
2176                    },
2177                ],
2178            )
2179            .await
2180            .unwrap();
2181
2182        // Subscribe the event caches, and create the room.
2183        let (room_event_cache_p0, room_event_cache_p1) = {
2184            let event_cache_p0 = client_p0.event_cache();
2185            event_cache_p0.subscribe().unwrap();
2186
2187            let event_cache_p1 = client_p1.event_cache();
2188            event_cache_p1.subscribe().unwrap();
2189
2190            client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
2191            client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
2192
2193            let (room_event_cache_p0, _drop_handles) =
2194                client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2195            let (room_event_cache_p1, _drop_handles) =
2196                client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2197
2198            (room_event_cache_p0, room_event_cache_p1)
2199        };
2200
2201        // Okay. We are ready for the test!
2202        //
2203        // First off, let's check `room_event_cache_p0` has access to the first event
2204        // loaded in-memory, then do a pagination, and see more events.
2205        let mut updates_stream_p0 = {
2206            let room_event_cache = &room_event_cache_p0;
2207
2208            let (initial_updates, mut updates_stream) =
2209                room_event_cache_p0.subscribe().await.unwrap();
2210
2211            // Initial updates contain `ev_id_1` only.
2212            assert_eq!(initial_updates.len(), 1);
2213            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2214            assert!(updates_stream.is_empty());
2215
2216            // `ev_id_1` must be loaded in memory.
2217            assert!(event_loaded(room_event_cache, ev_id_1).await);
2218
2219            // `ev_id_0` must NOT be loaded in memory.
2220            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2221
2222            // Load one more event with a backpagination.
2223            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2224
2225            // A new update for `ev_id_0` must be present.
2226            assert_matches!(
2227                updates_stream.recv().await.unwrap(),
2228                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2229                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2230                    assert_matches!(
2231                        &diffs[0],
2232                        VectorDiff::Insert { index: 0, value: event } => {
2233                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2234                        }
2235                    );
2236                }
2237            );
2238
2239            // `ev_id_0` must now be loaded in memory.
2240            assert!(event_loaded(room_event_cache, ev_id_0).await);
2241
2242            updates_stream
2243        };
2244
2245        // Second, let's check `room_event_cache_p1` has the same accesses.
2246        let mut updates_stream_p1 = {
2247            let room_event_cache = &room_event_cache_p1;
2248            let (initial_updates, mut updates_stream) =
2249                room_event_cache_p1.subscribe().await.unwrap();
2250
2251            // Initial updates contain `ev_id_1` only.
2252            assert_eq!(initial_updates.len(), 1);
2253            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2254            assert!(updates_stream.is_empty());
2255
2256            // `ev_id_1` must be loaded in memory.
2257            assert!(event_loaded(room_event_cache, ev_id_1).await);
2258
2259            // `ev_id_0` must NOT be loaded in memory.
2260            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2261
2262            // Load one more event with a backpagination.
2263            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2264
2265            // A new update for `ev_id_0` must be present.
2266            assert_matches!(
2267                updates_stream.recv().await.unwrap(),
2268                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2269                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2270                    assert_matches!(
2271                        &diffs[0],
2272                        VectorDiff::Insert { index: 0, value: event } => {
2273                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2274                        }
2275                    );
2276                }
2277            );
2278
2279            // `ev_id_0` must now be loaded in memory.
2280            assert!(event_loaded(room_event_cache, ev_id_0).await);
2281
2282            updates_stream
2283        };
2284
2285        // Do this a couple times, for the fun.
2286        for _ in 0..3 {
2287            // Third, because `room_event_cache_p1` has locked the store, the lock
2288            // is dirty for `room_event_cache_p0`, so it will shrink to its last
2289            // chunk!
2290            {
2291                let room_event_cache = &room_event_cache_p0;
2292                let updates_stream = &mut updates_stream_p0;
2293
2294                // `ev_id_1` must be loaded in memory, just like before.
2295                assert!(event_loaded(room_event_cache, ev_id_1).await);
2296
2297                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2298                // state has been reloaded to its last chunk.
2299                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2300
2301                // The reload can be observed via the updates too.
2302                assert_matches!(
2303                    updates_stream.recv().await.unwrap(),
2304                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2305                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2306                        assert_matches!(&diffs[0], VectorDiff::Clear);
2307                        assert_matches!(
2308                            &diffs[1],
2309                            VectorDiff::Append { values: events } => {
2310                                assert_eq!(events.len(), 1);
2311                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2312                            }
2313                        );
2314                    }
2315                );
2316
2317                // Load one more event with a backpagination.
2318                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2319
2320                // `ev_id_0` must now be loaded in memory.
2321                assert!(event_loaded(room_event_cache, ev_id_0).await);
2322
2323                // The pagination can be observed via the updates too.
2324                assert_matches!(
2325                    updates_stream.recv().await.unwrap(),
2326                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2327                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2328                        assert_matches!(
2329                            &diffs[0],
2330                            VectorDiff::Insert { index: 0, value: event } => {
2331                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2332                            }
2333                        );
2334                    }
2335                );
2336            }
2337
2338            // Fourth, because `room_event_cache_p0` has locked the store again, the lock
2339            // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
2340            // chunk!
2341            {
2342                let room_event_cache = &room_event_cache_p1;
2343                let updates_stream = &mut updates_stream_p1;
2344
2345                // `ev_id_1` must be loaded in memory, just like before.
2346                assert!(event_loaded(room_event_cache, ev_id_1).await);
2347
2348                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2349                // state has shrunk to its last chunk.
2350                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2351
2352                // The reload can be observed via the updates too.
2353                assert_matches!(
2354                    updates_stream.recv().await.unwrap(),
2355                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2356                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2357                        assert_matches!(&diffs[0], VectorDiff::Clear);
2358                        assert_matches!(
2359                            &diffs[1],
2360                            VectorDiff::Append { values: events } => {
2361                                assert_eq!(events.len(), 1);
2362                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2363                            }
2364                        );
2365                    }
2366                );
2367
2368                // Load one more event with a backpagination.
2369                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2370
2371                // `ev_id_0` must now be loaded in memory.
2372                assert!(event_loaded(room_event_cache, ev_id_0).await);
2373
2374                // The pagination can be observed via the updates too.
2375                assert_matches!(
2376                    updates_stream.recv().await.unwrap(),
2377                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2378                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2379                        assert_matches!(
2380                            &diffs[0],
2381                            VectorDiff::Insert { index: 0, value: event } => {
2382                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2383                            }
2384                        );
2385                    }
2386                );
2387            }
2388        }
2389
2390        // Repeat that with an explicit read lock (so that we don't rely on
2391        // `event_loaded` to trigger the dirty detection).
2392        for _ in 0..3 {
2393            {
2394                let room_event_cache = &room_event_cache_p0;
2395                let updates_stream = &mut updates_stream_p0;
2396
2397                let guard = room_event_cache.inner.state.read().await.unwrap();
2398
2399                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2400                // shared access.
2401                // See `RoomEventCacheStateLock::read` to learn more.
2402
2403                // The lock is no longer marked as dirty, it's been cleaned.
2404                assert!(guard.is_dirty().not());
2405
2406                // The reload can be observed via the updates too.
2407                assert_matches!(
2408                    updates_stream.recv().await.unwrap(),
2409                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2410                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2411                        assert_matches!(&diffs[0], VectorDiff::Clear);
2412                        assert_matches!(
2413                            &diffs[1],
2414                            VectorDiff::Append { values: events } => {
2415                                assert_eq!(events.len(), 1);
2416                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2417                            }
2418                        );
2419                    }
2420                );
2421
2422                assert!(event_loaded(room_event_cache, ev_id_1).await);
2423                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2424
2425                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2426                // want to make this super explicit).
2427                //
2428                // We drop need to drop it before the pagination because the pagination needs to
2429                // obtain a write lock.
2430                drop(guard);
2431
2432                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2433                assert!(event_loaded(room_event_cache, ev_id_0).await);
2434
2435                // The pagination can be observed via the updates too.
2436                assert_matches!(
2437                    updates_stream.recv().await.unwrap(),
2438                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2439                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2440                        assert_matches!(
2441                            &diffs[0],
2442                            VectorDiff::Insert { index: 0, value: event } => {
2443                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2444                            }
2445                        );
2446                    }
2447                );
2448            }
2449
2450            {
2451                let room_event_cache = &room_event_cache_p1;
2452                let updates_stream = &mut updates_stream_p1;
2453
2454                let guard = room_event_cache.inner.state.read().await.unwrap();
2455
2456                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2457                // shared access.
2458
2459                // The lock is no longer marked as dirty, it's been cleaned.
2460                assert!(guard.is_dirty().not());
2461
2462                // The reload can be observed via the updates too.
2463                assert_matches!(
2464                    updates_stream.recv().await.unwrap(),
2465                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2466                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2467                        assert_matches!(&diffs[0], VectorDiff::Clear);
2468                        assert_matches!(
2469                            &diffs[1],
2470                            VectorDiff::Append { values: events } => {
2471                                assert_eq!(events.len(), 1);
2472                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2473                            }
2474                        );
2475                    }
2476                );
2477
2478                assert!(event_loaded(room_event_cache, ev_id_1).await);
2479                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2480
2481                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2482                // want to make this super explicit).
2483                //
2484                // We drop need to drop it before the pagination because the pagination needs to
2485                // obtain a write lock.
2486                drop(guard);
2487
2488                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2489                assert!(event_loaded(room_event_cache, ev_id_0).await);
2490
2491                // The pagination can be observed via the updates too.
2492                assert_matches!(
2493                    updates_stream.recv().await.unwrap(),
2494                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2495                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2496                        assert_matches!(
2497                            &diffs[0],
2498                            VectorDiff::Insert { index: 0, value: event } => {
2499                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2500                            }
2501                        );
2502                    }
2503                );
2504            }
2505        }
2506
2507        // Repeat that with an explicit write lock.
2508        for _ in 0..3 {
2509            {
2510                let room_event_cache = &room_event_cache_p0;
2511                let updates_stream = &mut updates_stream_p0;
2512
2513                let guard = room_event_cache.inner.state.write().await.unwrap();
2514
2515                // The lock is no longer marked as dirty, it's been cleaned.
2516                assert!(guard.is_dirty().not());
2517
2518                // The reload can be observed via the updates too.
2519                assert_matches!(
2520                    updates_stream.recv().await.unwrap(),
2521                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2522                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2523                        assert_matches!(&diffs[0], VectorDiff::Clear);
2524                        assert_matches!(
2525                            &diffs[1],
2526                            VectorDiff::Append { values: events } => {
2527                                assert_eq!(events.len(), 1);
2528                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2529                            }
2530                        );
2531                    }
2532                );
2533
2534                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2535                // needs to obtain a read lock.
2536                drop(guard);
2537
2538                assert!(event_loaded(room_event_cache, ev_id_1).await);
2539                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2540
2541                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2542                assert!(event_loaded(room_event_cache, ev_id_0).await);
2543
2544                // The pagination can be observed via the updates too.
2545                assert_matches!(
2546                    updates_stream.recv().await.unwrap(),
2547                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2548                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2549                        assert_matches!(
2550                            &diffs[0],
2551                            VectorDiff::Insert { index: 0, value: event } => {
2552                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2553                            }
2554                        );
2555                    }
2556                );
2557            }
2558
2559            {
2560                let room_event_cache = &room_event_cache_p1;
2561                let updates_stream = &mut updates_stream_p1;
2562
2563                let guard = room_event_cache.inner.state.write().await.unwrap();
2564
2565                // The lock is no longer marked as dirty, it's been cleaned.
2566                assert!(guard.is_dirty().not());
2567
2568                // The reload can be observed via the updates too.
2569                assert_matches!(
2570                    updates_stream.recv().await.unwrap(),
2571                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2572                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2573                        assert_matches!(&diffs[0], VectorDiff::Clear);
2574                        assert_matches!(
2575                            &diffs[1],
2576                            VectorDiff::Append { values: events } => {
2577                                assert_eq!(events.len(), 1);
2578                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2579                            }
2580                        );
2581                    }
2582                );
2583
2584                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2585                // needs to obtain a read lock.
2586                drop(guard);
2587
2588                assert!(event_loaded(room_event_cache, ev_id_1).await);
2589                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2590
2591                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2592                assert!(event_loaded(room_event_cache, ev_id_0).await);
2593
2594                // The pagination can be observed via the updates too.
2595                assert_matches!(
2596                    updates_stream.recv().await.unwrap(),
2597                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2598                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2599                        assert_matches!(
2600                            &diffs[0],
2601                            VectorDiff::Insert { index: 0, value: event } => {
2602                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2603                            }
2604                        );
2605                    }
2606                );
2607            }
2608        }
2609    }
2610
2611    #[async_test]
2612    async fn test_load_when_dirty() {
2613        let room_id_0 = room_id!("!raclette:patate.ch");
2614        let room_id_1 = room_id!("!morbiflette:patate.ch");
2615
2616        // The storage shared by the two clients.
2617        let event_cache_store = MemoryStore::new();
2618
2619        // Client for the process 0.
2620        let client_p0 = MockClientBuilder::new(None)
2621            .on_builder(|builder| {
2622                builder.store_config(
2623                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2624                        .event_cache_store(event_cache_store.clone()),
2625                )
2626            })
2627            .build()
2628            .await;
2629
2630        // Client for the process 1.
2631        let client_p1 = MockClientBuilder::new(None)
2632            .on_builder(|builder| {
2633                builder.store_config(
2634                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2635                        .event_cache_store(event_cache_store),
2636                )
2637            })
2638            .build()
2639            .await;
2640
2641        // Subscribe the event caches, and create the room.
2642        let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2643            let event_cache_p0 = client_p0.event_cache();
2644            event_cache_p0.subscribe().unwrap();
2645
2646            let event_cache_p1 = client_p1.event_cache();
2647            event_cache_p1.subscribe().unwrap();
2648
2649            client_p0.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2650            client_p0.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2651
2652            client_p1.base_client().get_or_create_room(room_id_0, RoomState::Joined);
2653            client_p1.base_client().get_or_create_room(room_id_1, RoomState::Joined);
2654
2655            let (room_event_cache_0_p0, _drop_handles) =
2656                client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2657            let (room_event_cache_0_p1, _drop_handles) =
2658                client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2659
2660            (room_event_cache_0_p0, room_event_cache_0_p1)
2661        };
2662
2663        // Let's make the cross-process lock over the store dirty.
2664        {
2665            drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2666            drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2667        }
2668
2669        // Create the `RoomEventCache` for `room_id_1`. During its creation, the
2670        // cross-process lock over the store MUST be dirty, which makes no difference as
2671        // a clean one: the state is just loaded, not reloaded.
2672        let (room_event_cache_1_p0, _) =
2673            client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2674
2675        // Check the lock isn't dirty because it's been cleared.
2676        {
2677            let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2678            assert!(guard.is_dirty().not());
2679        }
2680
2681        // The only way to test this behaviour is to see that the dirty block in
2682        // `RoomEventCacheStateLock` is covered by this test.
2683    }
2684
2685    #[async_test]
2686    async fn test_uniq_read_marker() {
2687        let client = MockClientBuilder::new(None).build().await;
2688        let room_id = room_id!("!galette:saucisse.bzh");
2689        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2690
2691        let event_cache = client.event_cache();
2692
2693        event_cache.subscribe().unwrap();
2694
2695        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2696        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
2697        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
2698
2699        assert!(events.is_empty());
2700
2701        // When sending multiple times the same read marker event,…
2702        let read_marker_event = Raw::from_json_string(
2703            json!({
2704                "content": {
2705                    "event_id": "$crepe:saucisse.bzh"
2706                },
2707                "room_id": "!galette:saucisse.bzh",
2708                "type": "m.fully_read"
2709            })
2710            .to_string(),
2711        )
2712        .unwrap();
2713        let account_data = vec![read_marker_event; 100];
2714
2715        room_event_cache
2716            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
2717            .await
2718            .unwrap();
2719
2720        // … there's only one read marker update.
2721        assert_matches!(
2722            stream.recv().await.unwrap(),
2723            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
2724        );
2725
2726        assert!(stream.recv().now_or_never().is_none());
2727
2728        // None, because an account data doesn't trigger a generic update.
2729        assert!(generic_stream.recv().now_or_never().is_none());
2730    }
2731
2732    async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2733        room_event_cache
2734            .rfind_map_event_in_memory_by(|event| {
2735                (event.event_id().as_deref() == Some(event_id)).then_some(())
2736            })
2737            .await
2738            .unwrap()
2739            .is_some()
2740    }
2741}