Skip to main content

matrix_sdk/event_cache/caches/room/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod pagination;
16mod state;
17mod subscriber;
18mod updates;
19
20use std::{
21    collections::BTreeMap,
22    fmt,
23    sync::{Arc, atomic::Ordering},
24};
25
26use eyeball::SharedObservable;
27use matrix_sdk_base::{
28    deserialized_responses::AmbiguityChange,
29    event_cache::Event,
30    linked_chunk::Position,
31    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
32};
33use ruma::{
34    EventId, OwnedEventId, OwnedRoomId, RoomId,
35    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
36    serde::Raw,
37};
38pub(in super::super) use state::RoomEventCacheStateLock;
39pub use subscriber::RoomEventCacheSubscriber;
40use tokio::sync::{Notify, broadcast::Receiver, mpsc};
41use tracing::{instrument, trace, warn};
42pub use updates::{
43    RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
44    RoomEventCacheUpdateSender,
45};
46
47use super::{
48    super::{
49        AutoShrinkChannelPayload, EventCacheError, EventsOrigin, PaginationStatus, Result,
50        RoomPagination,
51    },
52    TimelineVectorDiffs,
53    event_linked_chunk::sort_positions_descending,
54    thread::pagination::ThreadPagination,
55};
56use crate::{client::WeakClient, room::WeakRoom};
57
58/// A subset of an event cache, for a room.
59///
60/// Cloning is shallow, and thus is cheap to do.
61#[derive(Clone)]
62pub struct RoomEventCache {
63    inner: Arc<RoomEventCacheInner>,
64}
65
66impl fmt::Debug for RoomEventCache {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        f.debug_struct("RoomEventCache").finish_non_exhaustive()
69    }
70}
71
72impl RoomEventCache {
73    /// Create a new [`RoomEventCache`] using the given room and store.
74    pub(in super::super) fn new(
75        client: WeakClient,
76        state: RoomEventCacheStateLock,
77        pagination_status: SharedObservable<PaginationStatus>,
78        room_id: OwnedRoomId,
79        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
80        update_sender: RoomEventCacheUpdateSender,
81    ) -> Self {
82        Self {
83            inner: Arc::new(RoomEventCacheInner::new(
84                client,
85                state,
86                pagination_status,
87                room_id,
88                auto_shrink_sender,
89                update_sender,
90            )),
91        }
92    }
93
94    /// Get the room ID for this [`RoomEventCache`].
95    pub fn room_id(&self) -> &RoomId {
96        &self.inner.room_id
97    }
98
99    /// Read all current events.
100    ///
101    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
102    /// subscriber.
103    pub async fn events(&self) -> Result<Vec<Event>> {
104        let state = self.inner.state.read().await?;
105
106        Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
107    }
108
109    /// Subscribe to this room updates, after getting the initial list of
110    /// events.
111    ///
112    /// Use [`RoomEventCache::events`] to get all current events without the
113    /// subscriber. Creating, and especially dropping, a
114    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
115    pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
116        let state = self.inner.state.read().await?;
117        let events =
118            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
119
120        let subscriber_count = state.subscriber_count();
121        let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
122        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
123
124        let subscriber = RoomEventCacheSubscriber::new(
125            self.inner.update_sender.new_room_receiver(),
126            self.inner.room_id.clone(),
127            self.inner.auto_shrink_sender.clone(),
128            subscriber_count.clone(),
129        );
130
131        Ok((events, subscriber))
132    }
133
134    /// Subscribe to thread for a given root event, and get a (maybe empty)
135    /// initially known list of events for that thread.
136    pub async fn subscribe_to_thread(
137        &self,
138        thread_root: OwnedEventId,
139    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
140        let mut state = self.inner.state.write().await?;
141        Ok(state.subscribe_to_thread(thread_root))
142    }
143
144    /// Subscribe to the pinned event cache for this room.
145    ///
146    /// This is a persisted view over the pinned events of a room.
147    ///
148    /// The pinned events will be initially reloaded from storage, and/or loaded
149    /// from a network request to fetch the latest pinned events and their
150    /// relations, to update it as needed. The list of pinned events will
151    /// also be kept up-to-date as new events are pinned, and new
152    /// related events show up from other sources.
153    pub async fn subscribe_to_pinned_events(
154        &self,
155    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
156        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
157        let state = self.inner.state.read().await?;
158
159        state.subscribe_to_pinned_events(room).await
160    }
161
162    /// Return a [`RoomPagination`] API object useful for running
163    /// back-pagination queries in the current room.
164    pub fn pagination(&self) -> RoomPagination {
165        RoomPagination::new(self.inner.clone())
166    }
167
168    /// Return a `ThreadPagination` API object useful for running
169    /// back-pagination queries in the `thread_id` thread.
170    pub fn thread_pagination(&self, thread_id: OwnedEventId) -> ThreadPagination {
171        ThreadPagination::new(self.inner.clone(), thread_id)
172    }
173
174    /// Try to find a single event in this room, starting from the most recent
175    /// event.
176    ///
177    /// The `predicate` receives the current event as its single argument.
178    ///
179    /// **Warning**! It looks into the loaded events from the in-memory linked
180    /// chunk **only**. It doesn't look inside the storage.
181    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
182    where
183        P: FnMut(&Event) -> Option<O>,
184    {
185        Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
186    }
187
188    /// Try to find an event by ID in this room.
189    ///
190    /// It starts by looking into loaded events before looking inside the
191    /// storage.
192    pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
193        Ok(self
194            .inner
195            .state
196            .read()
197            .await?
198            .find_event(event_id)
199            .await
200            .ok()
201            .flatten()
202            .map(|(_loc, event)| event))
203    }
204
205    /// Try to find an event by ID in this room, along with its related events.
206    ///
207    /// You can filter which types of related events to retrieve using
208    /// `filter`. `None` will retrieve related events of any type.
209    ///
210    /// The related events are sorted like this:
211    ///
212    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
213    ///   located at the beginning of the array.
214    /// - events present in the linked chunk (be it in memory or in the storage)
215    ///   will be sorted according to their ordering in the linked chunk.
216    pub async fn find_event_with_relations(
217        &self,
218        event_id: &EventId,
219        filter: Option<Vec<RelationType>>,
220    ) -> Result<Option<(Event, Vec<Event>)>> {
221        // Search in all loaded or stored events.
222        Ok(self
223            .inner
224            .state
225            .read()
226            .await?
227            .find_event_with_relations(event_id, filter.clone())
228            .await
229            .ok()
230            .flatten())
231    }
232
233    /// Try to find the related events for an event by ID in this room.
234    ///
235    /// You can filter which types of related events to retrieve using
236    /// `filter`. `None` will retrieve related events of any type.
237    ///
238    /// The related events are sorted like this:
239    ///
240    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
241    ///   located at the beginning of the array.
242    /// - events present in the linked chunk (be it in memory or in the storage)
243    ///   will be sorted according to their ordering in the linked chunk.
244    pub async fn find_event_relations(
245        &self,
246        event_id: &EventId,
247        filter: Option<Vec<RelationType>>,
248    ) -> Result<Vec<Event>> {
249        // Search in all loaded or stored events.
250        self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
251    }
252
253    /// Clear all the storage for this [`RoomEventCache`].
254    ///
255    /// This will get rid of all the events from the linked chunk and persisted
256    /// storage.
257    pub async fn clear(&self) -> Result<()> {
258        // Clear the linked chunk and persisted storage.
259        let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
260
261        // Notify observers about the update.
262        self.inner.update_sender.send(
263            RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
264                diffs: updates_as_vector_diffs,
265                origin: EventsOrigin::Cache,
266            }),
267            Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
268        );
269
270        Ok(())
271    }
272
273    /// Return a reference to the state.
274    pub(in super::super) fn state(&self) -> &RoomEventCacheStateLock {
275        &self.inner.state
276    }
277
278    /// Handle a [`JoinedRoomUpdate`].
279    #[instrument(skip_all, fields(room_id = %self.room_id()))]
280    pub(in super::super) async fn handle_joined_room_update(
281        &self,
282        updates: JoinedRoomUpdate,
283    ) -> Result<()> {
284        self.inner
285            .handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes)
286            .await?;
287        self.inner.handle_account_data(updates.account_data);
288
289        Ok(())
290    }
291
292    /// Handle a [`LeftRoomUpdate`].
293    #[instrument(skip_all, fields(room_id = %self.room_id()))]
294    pub(in super::super) async fn handle_left_room_update(
295        &self,
296        updates: LeftRoomUpdate,
297    ) -> Result<()> {
298        self.inner.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
299
300        Ok(())
301    }
302
303    /// Get a reference to the [`RoomEventCacheUpdateSender`].
304    pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
305        &self.inner.update_sender
306    }
307
308    /// Handle a single event from the `SendQueue`.
309    pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
310        self.inner
311            .handle_timeline(
312                Timeline { limited: false, prev_batch: None, events: vec![event] },
313                Vec::new(),
314                BTreeMap::new(),
315            )
316            .await
317    }
318
319    /// Save some events in the event cache, for further retrieval with
320    /// [`Self::event`].
321    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
322        match self.inner.state.write().await {
323            Ok(mut state_guard) => {
324                if let Err(err) = state_guard.save_events(events).await {
325                    warn!("couldn't save event in the event cache: {err}");
326                }
327            }
328
329            Err(err) => {
330                warn!("couldn't save event in the event cache: {err}");
331            }
332        }
333    }
334
335    /// Return a nice debug string (a vector of lines) for the linked chunk of
336    /// events for this room.
337    pub async fn debug_string(&self) -> Vec<String> {
338        match self.inner.state.read().await {
339            Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
340            Err(err) => {
341                warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
342
343                vec![]
344            }
345        }
346    }
347}
348
349/// The (non-cloneable) details of the `RoomEventCache`.
350pub(in super::super) struct RoomEventCacheInner {
351    /// The room id for this room.
352    room_id: OwnedRoomId,
353
354    pub weak_room: WeakRoom,
355
356    /// State for this room's event cache.
357    pub state: RoomEventCacheStateLock,
358
359    /// A notifier that we received a new pagination token.
360    pub pagination_batch_token_notifier: Notify,
361
362    pub pagination_status: SharedObservable<PaginationStatus>,
363
364    /// Sender to the auto-shrink channel.
365    ///
366    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
367    /// more details.
368    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
369
370    /// Update sender for this room.
371    update_sender: RoomEventCacheUpdateSender,
372}
373
374impl RoomEventCacheInner {
375    /// Creates a new cache for a room, and subscribes to room updates, so as
376    /// to handle new timeline events.
377    fn new(
378        client: WeakClient,
379        state: RoomEventCacheStateLock,
380        pagination_status: SharedObservable<PaginationStatus>,
381        room_id: OwnedRoomId,
382        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
383        update_sender: RoomEventCacheUpdateSender,
384    ) -> Self {
385        let weak_room = WeakRoom::new(client, room_id);
386
387        Self {
388            room_id: weak_room.room_id().to_owned(),
389            weak_room,
390            state,
391            update_sender,
392            pagination_batch_token_notifier: Default::default(),
393            auto_shrink_sender,
394            pagination_status,
395        }
396    }
397
398    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
399        if account_data.is_empty() {
400            return;
401        }
402
403        let mut handled_read_marker = false;
404
405        trace!("Handling account data");
406
407        for raw_event in account_data {
408            match raw_event.deserialize() {
409                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
410                    // If duplicated, do not forward read marker multiple times
411                    // to avoid clutter the update channel.
412                    if handled_read_marker {
413                        continue;
414                    }
415
416                    handled_read_marker = true;
417
418                    // Propagate to observers. (We ignore the error if there aren't any.)
419                    self.update_sender.send(
420                        RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
421                        None,
422                    );
423                }
424
425                Ok(_) => {
426                    // We're not interested in other room account data updates,
427                    // at this point.
428                }
429
430                Err(e) => {
431                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
432                    warn!(event_type, "Failed to deserialize account data: {e}");
433                }
434            }
435        }
436    }
437
438    /// Handle a [`Timeline`], i.e. new events received by a sync for this
439    /// room.
440    async fn handle_timeline(
441        &self,
442        timeline: Timeline,
443        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
444        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
445    ) -> Result<()> {
446        if timeline.events.is_empty()
447            && timeline.prev_batch.is_none()
448            && ephemeral_events.is_empty()
449            && ambiguity_changes.is_empty()
450        {
451            return Ok(());
452        }
453
454        // Add all the events to the backend.
455        trace!("adding new events");
456
457        let (stored_prev_batch_token, timeline_event_diffs) =
458            self.state.write().await?.handle_sync(timeline).await?;
459
460        // Now that all events have been added, we can trigger the
461        // `pagination_token_notifier`.
462        if stored_prev_batch_token {
463            self.pagination_batch_token_notifier.notify_one();
464        }
465
466        // The order matters here: first send the timeline event diffs, then only the
467        // related events (read receipts, etc.).
468        if !timeline_event_diffs.is_empty() {
469            self.update_sender.send(
470                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
471                    diffs: timeline_event_diffs,
472                    origin: EventsOrigin::Sync,
473                }),
474                Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
475            );
476        }
477
478        if !ephemeral_events.is_empty() {
479            self.update_sender
480                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
481        }
482
483        if !ambiguity_changes.is_empty() {
484            self.update_sender
485                .send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }, None);
486        }
487
488        Ok(())
489    }
490}
491
492#[derive(Clone, Copy)]
493pub(in super::super) enum PostProcessingOrigin {
494    Sync,
495    Backpagination,
496    #[cfg(feature = "e2e-encryption")]
497    Redecryption,
498}
499
500/// An enum representing where an event has been found.
501pub(in super::super) enum EventLocation {
502    /// Event lives in memory (and likely in the store!).
503    Memory(Position),
504
505    /// Event lives in the store only, it has not been loaded in memory yet.
506    Store,
507}
508
509#[cfg(test)]
510mod tests {
511    use matrix_sdk_base::event_cache::Event;
512    use matrix_sdk_test::{async_test, event_factory::EventFactory};
513    use ruma::{
514        RoomId, event_id,
515        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
516        room_id, user_id,
517    };
518
519    use crate::test_utils::logged_in_client;
520
521    #[async_test]
522    async fn test_find_event_by_id_with_edit_relation() {
523        let original_id = event_id!("$original");
524        let related_id = event_id!("$related");
525        let room_id = room_id!("!galette:saucisse.bzh");
526        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
527
528        assert_relations(
529            room_id,
530            f.text_msg("Original event").event_id(original_id).into(),
531            f.text_msg("* An edited event")
532                .edit(
533                    original_id,
534                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
535                )
536                .event_id(related_id)
537                .into(),
538            f,
539        )
540        .await;
541    }
542
543    #[async_test]
544    async fn test_find_event_by_id_with_thread_reply_relation() {
545        let original_id = event_id!("$original");
546        let related_id = event_id!("$related");
547        let room_id = room_id!("!galette:saucisse.bzh");
548        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
549
550        assert_relations(
551            room_id,
552            f.text_msg("Original event").event_id(original_id).into(),
553            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
554            f,
555        )
556        .await;
557    }
558
559    #[async_test]
560    async fn test_find_event_by_id_with_reaction_relation() {
561        let original_id = event_id!("$original");
562        let related_id = event_id!("$related");
563        let room_id = room_id!("!galette:saucisse.bzh");
564        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
565
566        assert_relations(
567            room_id,
568            f.text_msg("Original event").event_id(original_id).into(),
569            f.reaction(original_id, ":D").event_id(related_id).into(),
570            f,
571        )
572        .await;
573    }
574
575    #[async_test]
576    async fn test_find_event_by_id_with_poll_response_relation() {
577        let original_id = event_id!("$original");
578        let related_id = event_id!("$related");
579        let room_id = room_id!("!galette:saucisse.bzh");
580        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
581
582        assert_relations(
583            room_id,
584            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
585                .event_id(original_id)
586                .into(),
587            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
588            f,
589        )
590        .await;
591    }
592
593    #[async_test]
594    async fn test_find_event_by_id_with_poll_end_relation() {
595        let original_id = event_id!("$original");
596        let related_id = event_id!("$related");
597        let room_id = room_id!("!galette:saucisse.bzh");
598        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
599
600        assert_relations(
601            room_id,
602            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
603                .event_id(original_id)
604                .into(),
605            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
606            f,
607        )
608        .await;
609    }
610
611    #[async_test]
612    async fn test_find_event_by_id_with_filtered_relationships() {
613        let original_id = event_id!("$original");
614        let related_id = event_id!("$related");
615        let associated_related_id = event_id!("$recursive_related");
616        let room_id = room_id!("!galette:saucisse.bzh");
617        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
618
619        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
620        let related_event = event_factory
621            .text_msg("* Edited event")
622            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
623            .event_id(related_id)
624            .into();
625        let associated_related_event =
626            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
627
628        let client = logged_in_client(None).await;
629
630        let event_cache = client.event_cache();
631        event_cache.subscribe().unwrap();
632
633        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
634        let room = client.get_room(room_id).unwrap();
635
636        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
637
638        // Save the original event.
639        room_event_cache.save_events([original_event]).await;
640
641        // Save the related event.
642        room_event_cache.save_events([related_event]).await;
643
644        // Save the associated related event, which redacts the related event.
645        room_event_cache.save_events([associated_related_event]).await;
646
647        let filter = Some(vec![RelationType::Replacement]);
648        let (event, related_events) = room_event_cache
649            .find_event_with_relations(original_id, filter)
650            .await
651            .expect("Failed to find the event with relations")
652            .expect("Event has no relation");
653        // Fetched event is the right one.
654        let cached_event_id = event.event_id().unwrap();
655        assert_eq!(cached_event_id, original_id);
656
657        // There's only the edit event (an edit event can't have its own edit event).
658        assert_eq!(related_events.len(), 1);
659
660        let related_event_id = related_events[0].event_id().unwrap();
661        assert_eq!(related_event_id, related_id);
662
663        // Now we'll filter threads instead, there should be no related events
664        let filter = Some(vec![RelationType::Thread]);
665        let (event, related_events) = room_event_cache
666            .find_event_with_relations(original_id, filter)
667            .await
668            .expect("Failed to find the event with relations")
669            .expect("Event has no relation");
670
671        // Fetched event is the right one.
672        let cached_event_id = event.event_id().unwrap();
673        assert_eq!(cached_event_id, original_id);
674        // No Thread related events found
675        assert!(related_events.is_empty());
676    }
677
678    #[async_test]
679    async fn test_find_event_by_id_with_recursive_relation() {
680        let original_id = event_id!("$original");
681        let related_id = event_id!("$related");
682        let associated_related_id = event_id!("$recursive_related");
683        let room_id = room_id!("!galette:saucisse.bzh");
684        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
685
686        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
687        let related_event = event_factory
688            .text_msg("* Edited event")
689            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
690            .event_id(related_id)
691            .into();
692        let associated_related_event =
693            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
694
695        let client = logged_in_client(None).await;
696
697        let event_cache = client.event_cache();
698        event_cache.subscribe().unwrap();
699
700        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
701        let room = client.get_room(room_id).unwrap();
702
703        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
704
705        // Save the original event.
706        room_event_cache.save_events([original_event]).await;
707
708        // Save the related event.
709        room_event_cache.save_events([related_event]).await;
710
711        // Save the associated related event, which redacts the related event.
712        room_event_cache.save_events([associated_related_event]).await;
713
714        let (event, related_events) = room_event_cache
715            .find_event_with_relations(original_id, None)
716            .await
717            .expect("Failed to find the event with relations")
718            .expect("Event has no relation");
719        // Fetched event is the right one.
720        let cached_event_id = event.event_id().unwrap();
721        assert_eq!(cached_event_id, original_id);
722
723        // There are both the related id and the associatively related id
724        assert_eq!(related_events.len(), 2);
725
726        let related_event_id = related_events[0].event_id().unwrap();
727        assert_eq!(related_event_id, related_id);
728        let related_event_id = related_events[1].event_id().unwrap();
729        assert_eq!(related_event_id, associated_related_id);
730    }
731
732    async fn assert_relations(
733        room_id: &RoomId,
734        original_event: Event,
735        related_event: Event,
736        event_factory: EventFactory,
737    ) {
738        let client = logged_in_client(None).await;
739
740        let event_cache = client.event_cache();
741        event_cache.subscribe().unwrap();
742
743        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
744        let room = client.get_room(room_id).unwrap();
745
746        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
747
748        // Save the original event.
749        let original_event_id = original_event.event_id().unwrap();
750        room_event_cache.save_events([original_event]).await;
751
752        // Save an unrelated event to check it's not in the related events list.
753        let unrelated_id = event_id!("$2");
754        room_event_cache
755            .save_events([event_factory
756                .text_msg("An unrelated event")
757                .event_id(unrelated_id)
758                .into()])
759            .await;
760
761        // Save the related event.
762        let related_id = related_event.event_id().unwrap();
763        room_event_cache.save_events([related_event]).await;
764
765        let (event, related_events) = room_event_cache
766            .find_event_with_relations(&original_event_id, None)
767            .await
768            .expect("Failed to find the event with relations")
769            .expect("Event has no relation");
770        // Fetched event is the right one.
771        let cached_event_id = event.event_id().unwrap();
772        assert_eq!(cached_event_id, original_event_id);
773
774        // There is only the actually related event in the related ones
775        let related_event_id = related_events[0].event_id().unwrap();
776        assert_eq!(related_event_id, related_id);
777    }
778}
779
780#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
781mod timed_tests {
782    use std::{ops::Not, sync::Arc};
783
784    use assert_matches::assert_matches;
785    use assert_matches2::assert_let;
786    use eyeball_im::VectorDiff;
787    use futures_util::FutureExt;
788    use matrix_sdk_base::{
789        event_cache::{
790            Gap,
791            store::{EventCacheStore as _, MemoryStore},
792        },
793        linked_chunk::{
794            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
795            lazy_loader::from_all_chunks,
796        },
797        store::StoreConfig,
798        sync::{JoinedRoomUpdate, Timeline},
799    };
800    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
801    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
802    use ruma::{
803        EventId, OwnedUserId, event_id,
804        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
805        room_id, user_id,
806    };
807    use tokio::task::yield_now;
808
809    use super::{
810        super::{lock::Reload as _, pagination::LoadMoreEventsBackwardsOutcome},
811        RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
812    };
813    use crate::{
814        assert_let_timeout, event_cache::TimelineVectorDiffs, test_utils::client::MockClientBuilder,
815    };
816
817    #[async_test]
818    async fn test_write_to_storage() {
819        let room_id = room_id!("!galette:saucisse.bzh");
820        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
821
822        let event_cache_store = Arc::new(MemoryStore::new());
823
824        let client = MockClientBuilder::new(None)
825            .on_builder(|builder| {
826                builder.store_config(
827                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
828                        .event_cache_store(event_cache_store.clone()),
829                )
830            })
831            .build()
832            .await;
833
834        let event_cache = client.event_cache();
835
836        // Don't forget to subscribe and like.
837        event_cache.subscribe().unwrap();
838
839        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
840        let room = client.get_room(room_id).unwrap();
841
842        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
843        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
844
845        // Propagate an update for a message and a prev-batch token.
846        let timeline = Timeline {
847            limited: true,
848            prev_batch: Some("raclette".to_owned()),
849            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
850        };
851
852        room_event_cache
853            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
854            .await
855            .unwrap();
856
857        // Just checking the generic update is correct.
858        assert_matches!(
859            generic_stream.recv().await,
860            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
861                assert_eq!(expected_room_id, room_id);
862            }
863        );
864        assert!(generic_stream.is_empty());
865
866        // Check the storage.
867        let linked_chunk = from_all_chunks::<3, _, _>(
868            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
869        )
870        .unwrap()
871        .unwrap();
872
873        assert_eq!(linked_chunk.chunks().count(), 2);
874
875        let mut chunks = linked_chunk.chunks();
876
877        // We start with the gap.
878        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
879            assert_eq!(gap.prev_token, "raclette");
880        });
881
882        // Then we have the stored event.
883        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
884            assert_eq!(events.len(), 1);
885            let deserialized = events[0].raw().deserialize().unwrap();
886            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
887            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
888        });
889
890        // That's all, folks!
891        assert!(chunks.next().is_none());
892    }
893
894    #[async_test]
895    async fn test_write_to_storage_strips_bundled_relations() {
896        let room_id = room_id!("!galette:saucisse.bzh");
897        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
898
899        let event_cache_store = Arc::new(MemoryStore::new());
900
901        let client = MockClientBuilder::new(None)
902            .on_builder(|builder| {
903                builder.store_config(
904                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
905                        .event_cache_store(event_cache_store.clone()),
906                )
907            })
908            .build()
909            .await;
910
911        let event_cache = client.event_cache();
912
913        // Don't forget to subscribe and like.
914        event_cache.subscribe().unwrap();
915
916        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
917        let room = client.get_room(room_id).unwrap();
918
919        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
920        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
921
922        // Propagate an update for a message with bundled relations.
923        let ev = f
924            .text_msg("hey yo")
925            .sender(*ALICE)
926            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
927            .into_event();
928
929        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
930
931        room_event_cache
932            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
933            .await
934            .unwrap();
935
936        // Just checking the generic update is correct.
937        assert_matches!(
938            generic_stream.recv().await,
939            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
940                assert_eq!(expected_room_id, room_id);
941            }
942        );
943        assert!(generic_stream.is_empty());
944
945        // The in-memory linked chunk keeps the bundled relation.
946        {
947            let events = room_event_cache.events().await.unwrap();
948
949            assert_eq!(events.len(), 1);
950
951            let ev = events[0].raw().deserialize().unwrap();
952            assert_let!(
953                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
954            );
955
956            let original = msg.as_original().unwrap();
957            assert_eq!(original.content.body(), "hey yo");
958            assert!(original.unsigned.relations.replace.is_some());
959        }
960
961        // The one in storage does not.
962        let linked_chunk = from_all_chunks::<3, _, _>(
963            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
964        )
965        .unwrap()
966        .unwrap();
967
968        assert_eq!(linked_chunk.chunks().count(), 1);
969
970        let mut chunks = linked_chunk.chunks();
971        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
972            assert_eq!(events.len(), 1);
973
974            let ev = events[0].raw().deserialize().unwrap();
975            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
976
977            let original = msg.as_original().unwrap();
978            assert_eq!(original.content.body(), "hey yo");
979            assert!(original.unsigned.relations.replace.is_none());
980        });
981
982        // That's all, folks!
983        assert!(chunks.next().is_none());
984    }
985
986    #[async_test]
987    async fn test_clear() {
988        let room_id = room_id!("!galette:saucisse.bzh");
989        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
990
991        let event_cache_store = Arc::new(MemoryStore::new());
992
993        let event_id1 = event_id!("$1");
994        let event_id2 = event_id!("$2");
995
996        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
997        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
998
999        // Prefill the store with some data.
1000        event_cache_store
1001            .handle_linked_chunk_updates(
1002                LinkedChunkId::Room(room_id),
1003                vec![
1004                    // An empty items chunk.
1005                    Update::NewItemsChunk {
1006                        previous: None,
1007                        new: ChunkIdentifier::new(0),
1008                        next: None,
1009                    },
1010                    // A gap chunk.
1011                    Update::NewGapChunk {
1012                        previous: Some(ChunkIdentifier::new(0)),
1013                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1014                        new: ChunkIdentifier::new(42),
1015                        next: None,
1016                        gap: Gap { prev_token: "comté".to_owned() },
1017                    },
1018                    // Another items chunk, non-empty this time.
1019                    Update::NewItemsChunk {
1020                        previous: Some(ChunkIdentifier::new(42)),
1021                        new: ChunkIdentifier::new(1),
1022                        next: None,
1023                    },
1024                    Update::PushItems {
1025                        at: Position::new(ChunkIdentifier::new(1), 0),
1026                        items: vec![ev1.clone()],
1027                    },
1028                    // And another items chunk, non-empty again.
1029                    Update::NewItemsChunk {
1030                        previous: Some(ChunkIdentifier::new(1)),
1031                        new: ChunkIdentifier::new(2),
1032                        next: None,
1033                    },
1034                    Update::PushItems {
1035                        at: Position::new(ChunkIdentifier::new(2), 0),
1036                        items: vec![ev2.clone()],
1037                    },
1038                ],
1039            )
1040            .await
1041            .unwrap();
1042
1043        let client = MockClientBuilder::new(None)
1044            .on_builder(|builder| {
1045                builder.store_config(
1046                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1047                        .event_cache_store(event_cache_store.clone()),
1048                )
1049            })
1050            .build()
1051            .await;
1052
1053        let event_cache = client.event_cache();
1054
1055        // Don't forget to subscribe and like.
1056        event_cache.subscribe().unwrap();
1057
1058        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1059        let room = client.get_room(room_id).unwrap();
1060
1061        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1062
1063        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1064        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1065
1066        // The rooms knows about all cached events.
1067        {
1068            assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1069            assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1070        }
1071
1072        // But only part of events are loaded from the store
1073        {
1074            // The room must contain only one event because only one chunk has been loaded.
1075            assert_eq!(items.len(), 1);
1076            assert_eq!(items[0].event_id().unwrap(), event_id2);
1077
1078            assert!(stream.is_empty());
1079        }
1080
1081        // Let's load more chunks to load all events.
1082        {
1083            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1084
1085            assert_let_timeout!(
1086                Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1087                    stream.recv()
1088            );
1089            assert_eq!(diffs.len(), 1);
1090            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1091                // Here you are `event_id1`!
1092                assert_eq!(event.event_id().unwrap(), event_id1);
1093            });
1094
1095            assert!(stream.is_empty());
1096
1097            assert_let_timeout!(
1098                Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
1099                    generic_stream.recv()
1100            );
1101            assert_eq!(room_id, expected_room_id);
1102            assert!(generic_stream.is_empty());
1103        }
1104
1105        // After clearing,…
1106        room_event_cache.clear().await.unwrap();
1107
1108        //… we get an update that the content has been cleared.
1109        assert_let_timeout!(
1110            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1111                stream.recv()
1112        );
1113        assert_eq!(diffs.len(), 1);
1114        assert_let!(VectorDiff::Clear = &diffs[0]);
1115
1116        // … same with a generic update.
1117        assert_let_timeout!(
1118            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
1119        );
1120        assert_eq!(received_room_id, room_id);
1121        assert!(generic_stream.is_empty());
1122
1123        // Events individually are not forgotten by the event cache, after clearing a
1124        // room.
1125        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1126
1127        // But their presence in a linked chunk is forgotten.
1128        let items = room_event_cache.events().await.unwrap();
1129        assert!(items.is_empty());
1130
1131        // The event cache store too.
1132        let linked_chunk = from_all_chunks::<3, _, _>(
1133            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1134        )
1135        .unwrap()
1136        .unwrap();
1137
1138        // Note: while the event cache store could return `None` here, clearing it will
1139        // reset it to its initial form, maintaining the invariant that it
1140        // contains a single items chunk that's empty.
1141        assert_eq!(linked_chunk.num_items(), 0);
1142    }
1143
1144    #[async_test]
1145    async fn test_load_from_storage() {
1146        let room_id = room_id!("!galette:saucisse.bzh");
1147        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1148
1149        let event_cache_store = Arc::new(MemoryStore::new());
1150
1151        let event_id1 = event_id!("$1");
1152        let event_id2 = event_id!("$2");
1153
1154        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1155        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1156
1157        // Prefill the store with some data.
1158        event_cache_store
1159            .handle_linked_chunk_updates(
1160                LinkedChunkId::Room(room_id),
1161                vec![
1162                    // An empty items chunk.
1163                    Update::NewItemsChunk {
1164                        previous: None,
1165                        new: ChunkIdentifier::new(0),
1166                        next: None,
1167                    },
1168                    // A gap chunk.
1169                    Update::NewGapChunk {
1170                        previous: Some(ChunkIdentifier::new(0)),
1171                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1172                        new: ChunkIdentifier::new(42),
1173                        next: None,
1174                        gap: Gap { prev_token: "cheddar".to_owned() },
1175                    },
1176                    // Another items chunk, non-empty this time.
1177                    Update::NewItemsChunk {
1178                        previous: Some(ChunkIdentifier::new(42)),
1179                        new: ChunkIdentifier::new(1),
1180                        next: None,
1181                    },
1182                    Update::PushItems {
1183                        at: Position::new(ChunkIdentifier::new(1), 0),
1184                        items: vec![ev1.clone()],
1185                    },
1186                    // And another items chunk, non-empty again.
1187                    Update::NewItemsChunk {
1188                        previous: Some(ChunkIdentifier::new(1)),
1189                        new: ChunkIdentifier::new(2),
1190                        next: None,
1191                    },
1192                    Update::PushItems {
1193                        at: Position::new(ChunkIdentifier::new(2), 0),
1194                        items: vec![ev2.clone()],
1195                    },
1196                ],
1197            )
1198            .await
1199            .unwrap();
1200
1201        let client = MockClientBuilder::new(None)
1202            .on_builder(|builder| {
1203                builder.store_config(
1204                    StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
1205                        .event_cache_store(event_cache_store.clone()),
1206                )
1207            })
1208            .build()
1209            .await;
1210
1211        let event_cache = client.event_cache();
1212
1213        // Don't forget to subscribe and like.
1214        event_cache.subscribe().unwrap();
1215
1216        // Let's check whether the generic updates are received for the initialisation.
1217        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1218
1219        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1220        let room = client.get_room(room_id).unwrap();
1221
1222        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1223
1224        // The room event cache has been loaded. A generic update must have been
1225        // triggered.
1226        assert_matches!(
1227            generic_stream.recv().await,
1228            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1229                assert_eq!(room_id, expected_room_id);
1230            }
1231        );
1232        assert!(generic_stream.is_empty());
1233
1234        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1235
1236        // The initial items contain one event because only the last chunk is loaded by
1237        // default.
1238        assert_eq!(items.len(), 1);
1239        assert_eq!(items[0].event_id().unwrap(), event_id2);
1240        assert!(stream.is_empty());
1241
1242        // The event cache knows only all events though, even if they aren't loaded.
1243        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
1244        assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
1245
1246        // Let's paginate to load more events.
1247        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1248
1249        assert_let_timeout!(
1250            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1251                stream.recv()
1252        );
1253        assert_eq!(diffs.len(), 1);
1254        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1255            assert_eq!(event.event_id().unwrap(), event_id1);
1256        });
1257
1258        assert!(stream.is_empty());
1259
1260        // A generic update is triggered too.
1261        assert_matches!(
1262            generic_stream.recv().await,
1263            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1264                assert_eq!(expected_room_id, room_id);
1265            }
1266        );
1267        assert!(generic_stream.is_empty());
1268
1269        // A new update with one of these events leads to deduplication.
1270        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1271
1272        room_event_cache
1273            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1274            .await
1275            .unwrap();
1276
1277        // Just checking the generic update is correct. There is a duplicate event, so
1278        // no generic changes whatsoever!
1279        assert!(generic_stream.recv().now_or_never().is_none());
1280
1281        // The stream doesn't report these changes *yet*. Use the items vector given
1282        // when subscribing, to check that the items correspond to their new
1283        // positions. The duplicated item is removed (so it's not the first
1284        // element anymore), and it's added to the back of the list.
1285        let items = room_event_cache.events().await.unwrap();
1286        assert_eq!(items.len(), 2);
1287        assert_eq!(items[0].event_id().unwrap(), event_id1);
1288        assert_eq!(items[1].event_id().unwrap(), event_id2);
1289    }
1290
1291    #[async_test]
1292    async fn test_load_from_storage_resilient_to_failure() {
1293        let room_id = room_id!("!fondue:patate.ch");
1294        let event_cache_store = Arc::new(MemoryStore::new());
1295
1296        let event = EventFactory::new()
1297            .room(room_id)
1298            .sender(user_id!("@ben:saucisse.bzh"))
1299            .text_msg("foo")
1300            .event_id(event_id!("$42"))
1301            .into_event();
1302
1303        // Prefill the store with invalid data: two chunks that form a cycle.
1304        event_cache_store
1305            .handle_linked_chunk_updates(
1306                LinkedChunkId::Room(room_id),
1307                vec![
1308                    Update::NewItemsChunk {
1309                        previous: None,
1310                        new: ChunkIdentifier::new(0),
1311                        next: None,
1312                    },
1313                    Update::PushItems {
1314                        at: Position::new(ChunkIdentifier::new(0), 0),
1315                        items: vec![event],
1316                    },
1317                    Update::NewItemsChunk {
1318                        previous: Some(ChunkIdentifier::new(0)),
1319                        new: ChunkIdentifier::new(1),
1320                        next: Some(ChunkIdentifier::new(0)),
1321                    },
1322                ],
1323            )
1324            .await
1325            .unwrap();
1326
1327        let client = MockClientBuilder::new(None)
1328            .on_builder(|builder| {
1329                builder.store_config(
1330                    StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
1331                        .event_cache_store(event_cache_store.clone()),
1332                )
1333            })
1334            .build()
1335            .await;
1336
1337        let event_cache = client.event_cache();
1338
1339        // Don't forget to subscribe and like.
1340        event_cache.subscribe().unwrap();
1341
1342        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1343        let room = client.get_room(room_id).unwrap();
1344
1345        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1346
1347        let items = room_event_cache.events().await.unwrap();
1348
1349        // Because the persisted content was invalid, the room store is reset: there are
1350        // no events in the cache.
1351        assert!(items.is_empty());
1352
1353        // Storage doesn't contain anything. It would also be valid that it contains a
1354        // single initial empty items chunk.
1355        let raw_chunks =
1356            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
1357        assert!(raw_chunks.is_empty());
1358    }
1359
1360    #[async_test]
1361    async fn test_no_useless_gaps() {
1362        let room_id = room_id!("!galette:saucisse.bzh");
1363
1364        let client = MockClientBuilder::new(None).build().await;
1365
1366        let event_cache = client.event_cache();
1367        event_cache.subscribe().unwrap();
1368
1369        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1370        let room = client.get_room(room_id).unwrap();
1371        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1372        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1373
1374        let f = EventFactory::new().room(room_id).sender(*ALICE);
1375
1376        // Propagate an update including a limited timeline with one message and a
1377        // prev-batch token.
1378        room_event_cache
1379            .handle_joined_room_update(JoinedRoomUpdate {
1380                timeline: Timeline {
1381                    limited: true,
1382                    prev_batch: Some("raclette".to_owned()),
1383                    events: vec![f.text_msg("hey yo").into_event()],
1384                },
1385                ..Default::default()
1386            })
1387            .await
1388            .unwrap();
1389
1390        // Just checking the generic update is correct.
1391        assert_matches!(
1392            generic_stream.recv().await,
1393            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1394                assert_eq!(expected_room_id, room_id);
1395            }
1396        );
1397        assert!(generic_stream.is_empty());
1398
1399        {
1400            let mut state = room_event_cache.inner.state.write().await.unwrap();
1401
1402            let mut num_gaps = 0;
1403            let mut num_events = 0;
1404
1405            for c in state.room_linked_chunk().chunks() {
1406                match c.content() {
1407                    ChunkContent::Items(items) => num_events += items.len(),
1408                    ChunkContent::Gap(_) => num_gaps += 1,
1409                }
1410            }
1411
1412            // The limited sync unloads the chunk, so it will appear as if there are only
1413            // the events.
1414            assert_eq!(num_gaps, 0);
1415            assert_eq!(num_events, 1);
1416
1417            // But if I manually reload more of the chunk, the gap will be present.
1418            assert_matches!(
1419                state.load_more_events_backwards().await.unwrap(),
1420                LoadMoreEventsBackwardsOutcome::Gap { .. }
1421            );
1422
1423            num_gaps = 0;
1424            num_events = 0;
1425            for c in state.room_linked_chunk().chunks() {
1426                match c.content() {
1427                    ChunkContent::Items(items) => num_events += items.len(),
1428                    ChunkContent::Gap(_) => num_gaps += 1,
1429                }
1430            }
1431
1432            // The gap must have been stored.
1433            assert_eq!(num_gaps, 1);
1434            assert_eq!(num_events, 1);
1435        }
1436
1437        // Now, propagate an update for another message, but the timeline isn't limited
1438        // this time.
1439        room_event_cache
1440            .handle_joined_room_update(JoinedRoomUpdate {
1441                timeline: Timeline {
1442                    limited: false,
1443                    prev_batch: Some("fondue".to_owned()),
1444                    events: vec![f.text_msg("sup").into_event()],
1445                },
1446                ..Default::default()
1447            })
1448            .await
1449            .unwrap();
1450
1451        // Just checking the generic update is correct.
1452        assert_matches!(
1453            generic_stream.recv().await,
1454            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
1455                assert_eq!(expected_room_id, room_id);
1456            }
1457        );
1458        assert!(generic_stream.is_empty());
1459
1460        {
1461            let state = room_event_cache.inner.state.read().await.unwrap();
1462
1463            let mut num_gaps = 0;
1464            let mut num_events = 0;
1465
1466            for c in state.room_linked_chunk().chunks() {
1467                match c.content() {
1468                    ChunkContent::Items(items) => num_events += items.len(),
1469                    ChunkContent::Gap(gap) => {
1470                        assert_eq!(gap.prev_token, "raclette");
1471                        num_gaps += 1;
1472                    }
1473                }
1474            }
1475
1476            // There's only the previous gap, no new ones.
1477            assert_eq!(num_gaps, 1);
1478            assert_eq!(num_events, 2);
1479        }
1480    }
1481
1482    #[async_test]
1483    async fn test_shrink_to_last_chunk() {
1484        let room_id = room_id!("!galette:saucisse.bzh");
1485
1486        let client = MockClientBuilder::new(None).build().await;
1487
1488        let f = EventFactory::new().room(room_id);
1489
1490        let evid1 = event_id!("$1");
1491        let evid2 = event_id!("$2");
1492
1493        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1494        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1495
1496        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1497        {
1498            client
1499                .event_cache_store()
1500                .lock()
1501                .await
1502                .expect("Could not acquire the event cache lock")
1503                .as_clean()
1504                .expect("Could not acquire a clean event cache lock")
1505                .handle_linked_chunk_updates(
1506                    LinkedChunkId::Room(room_id),
1507                    vec![
1508                        Update::NewItemsChunk {
1509                            previous: None,
1510                            new: ChunkIdentifier::new(0),
1511                            next: None,
1512                        },
1513                        Update::PushItems {
1514                            at: Position::new(ChunkIdentifier::new(0), 0),
1515                            items: vec![ev1],
1516                        },
1517                        Update::NewItemsChunk {
1518                            previous: Some(ChunkIdentifier::new(0)),
1519                            new: ChunkIdentifier::new(1),
1520                            next: None,
1521                        },
1522                        Update::PushItems {
1523                            at: Position::new(ChunkIdentifier::new(1), 0),
1524                            items: vec![ev2],
1525                        },
1526                    ],
1527                )
1528                .await
1529                .unwrap();
1530        }
1531
1532        let event_cache = client.event_cache();
1533        event_cache.subscribe().unwrap();
1534
1535        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1536        let room = client.get_room(room_id).unwrap();
1537        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1538
1539        // Sanity check: lazily loaded, so only includes one item at start.
1540        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
1541        assert_eq!(events.len(), 1);
1542        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1543        assert!(stream.is_empty());
1544
1545        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1546
1547        // Force loading the full linked chunk by back-paginating.
1548        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1549        assert_eq!(outcome.events.len(), 1);
1550        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1551        assert!(outcome.reached_start);
1552
1553        // We also get an update about the loading from the store.
1554        assert_let_timeout!(
1555            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1556                stream.recv()
1557        );
1558        assert_eq!(diffs.len(), 1);
1559        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1560            assert_eq!(value.event_id().as_deref(), Some(evid1));
1561        });
1562
1563        assert!(stream.is_empty());
1564
1565        // Same for the generic update.
1566        assert_let_timeout!(
1567            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1568        );
1569        assert_eq!(expected_room_id, room_id);
1570        assert!(generic_stream.is_empty());
1571
1572        // Shrink the linked chunk to the last chunk.
1573        room_event_cache
1574            .inner
1575            .state
1576            .write()
1577            .await
1578            .unwrap()
1579            .reload()
1580            .await
1581            .expect("shrinking should succeed");
1582
1583        // We receive updates about the changes to the linked chunk.
1584        assert_let_timeout!(
1585            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1586                stream.recv()
1587        );
1588        assert_eq!(diffs.len(), 2);
1589        assert_matches!(&diffs[0], VectorDiff::Clear);
1590        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
1591            assert_eq!(values.len(), 1);
1592            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
1593        });
1594
1595        assert!(stream.is_empty());
1596
1597        // A generic update has been received.
1598        assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
1599        assert!(generic_stream.is_empty());
1600
1601        // When reading the events, we do get only the last one.
1602        let events = room_event_cache.events().await.unwrap();
1603        assert_eq!(events.len(), 1);
1604        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
1605
1606        // But if we back-paginate, we don't need access to network to find out about
1607        // the previous event.
1608        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1609        assert_eq!(outcome.events.len(), 1);
1610        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1611        assert!(outcome.reached_start);
1612    }
1613
1614    #[async_test]
1615    async fn test_room_ordering() {
1616        let room_id = room_id!("!galette:saucisse.bzh");
1617
1618        let client = MockClientBuilder::new(None).build().await;
1619
1620        let f = EventFactory::new().room(room_id).sender(*ALICE);
1621
1622        let evid1 = event_id!("$1");
1623        let evid2 = event_id!("$2");
1624        let evid3 = event_id!("$3");
1625
1626        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
1627        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1628        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
1629
1630        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1631        {
1632            client
1633                .event_cache_store()
1634                .lock()
1635                .await
1636                .expect("Could not acquire the event cache lock")
1637                .as_clean()
1638                .expect("Could not acquire a clean event cache lock")
1639                .handle_linked_chunk_updates(
1640                    LinkedChunkId::Room(room_id),
1641                    vec![
1642                        Update::NewItemsChunk {
1643                            previous: None,
1644                            new: ChunkIdentifier::new(0),
1645                            next: None,
1646                        },
1647                        Update::PushItems {
1648                            at: Position::new(ChunkIdentifier::new(0), 0),
1649                            items: vec![ev1, ev2],
1650                        },
1651                        Update::NewItemsChunk {
1652                            previous: Some(ChunkIdentifier::new(0)),
1653                            new: ChunkIdentifier::new(1),
1654                            next: None,
1655                        },
1656                        Update::PushItems {
1657                            at: Position::new(ChunkIdentifier::new(1), 0),
1658                            items: vec![ev3.clone()],
1659                        },
1660                    ],
1661                )
1662                .await
1663                .unwrap();
1664        }
1665
1666        let event_cache = client.event_cache();
1667        event_cache.subscribe().unwrap();
1668
1669        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1670        let room = client.get_room(room_id).unwrap();
1671        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1672
1673        // Initially, the linked chunk only contains the last chunk, so only ev3 is
1674        // loaded.
1675        {
1676            let state = room_event_cache.inner.state.read().await.unwrap();
1677            let room_linked_chunk = state.room_linked_chunk();
1678
1679            // But we can get the order of ev1.
1680            assert_eq!(
1681                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1682                Some(0)
1683            );
1684
1685            // And that of ev2 as well.
1686            assert_eq!(
1687                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1688                Some(1)
1689            );
1690
1691            // ev3, which is loaded, also has a known ordering.
1692            let mut events = room_linked_chunk.events();
1693            let (pos, ev) = events.next().unwrap();
1694            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
1695            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1696            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1697
1698            // No other loaded events.
1699            assert!(events.next().is_none());
1700        }
1701
1702        // Force loading the full linked chunk by back-paginating.
1703        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1704        assert!(outcome.reached_start);
1705
1706        // All events are now loaded, so their order is precisely their enumerated index
1707        // in a linear iteration.
1708        {
1709            let state = room_event_cache.inner.state.read().await.unwrap();
1710            let room_linked_chunk = state.room_linked_chunk();
1711
1712            for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
1713                assert_eq!(room_linked_chunk.event_order(pos), Some(i));
1714            }
1715        }
1716
1717        // Handle a gappy sync with two events (including one duplicate, so
1718        // deduplication kicks in), so that the linked chunk is shrunk to the
1719        // last chunk, and that the linked chunk only contains the last two
1720        // events.
1721        let evid4 = event_id!("$4");
1722        room_event_cache
1723            .handle_joined_room_update(JoinedRoomUpdate {
1724                timeline: Timeline {
1725                    limited: true,
1726                    prev_batch: Some("fondue".to_owned()),
1727                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
1728                },
1729                ..Default::default()
1730            })
1731            .await
1732            .unwrap();
1733
1734        {
1735            let state = room_event_cache.inner.state.read().await.unwrap();
1736            let room_linked_chunk = state.room_linked_chunk();
1737
1738            // After the shrink, only evid3 and evid4 are loaded.
1739            let mut events = room_linked_chunk.events();
1740
1741            let (pos, ev) = events.next().unwrap();
1742            assert_eq!(ev.event_id().as_deref(), Some(evid3));
1743            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
1744
1745            let (pos, ev) = events.next().unwrap();
1746            assert_eq!(ev.event_id().as_deref(), Some(evid4));
1747            assert_eq!(room_linked_chunk.event_order(pos), Some(3));
1748
1749            // No other loaded events.
1750            assert!(events.next().is_none());
1751
1752            // But we can still get the order of previous events.
1753            assert_eq!(
1754                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
1755                Some(0)
1756            );
1757            assert_eq!(
1758                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
1759                Some(1)
1760            );
1761
1762            // ev3 doesn't have an order with its previous position, since it's been
1763            // deduplicated.
1764            assert_eq!(
1765                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
1766                None
1767            );
1768        }
1769    }
1770
1771    #[async_test]
1772    async fn test_auto_shrink_after_all_subscribers_are_gone() {
1773        let room_id = room_id!("!galette:saucisse.bzh");
1774
1775        let client = MockClientBuilder::new(None).build().await;
1776
1777        let f = EventFactory::new().room(room_id);
1778
1779        let evid1 = event_id!("$1");
1780        let evid2 = event_id!("$2");
1781
1782        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
1783        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
1784
1785        // Fill the event cache store with an initial linked chunk with 2 events chunks.
1786        {
1787            client
1788                .event_cache_store()
1789                .lock()
1790                .await
1791                .expect("Could not acquire the event cache lock")
1792                .as_clean()
1793                .expect("Could not acquire a clean event cache lock")
1794                .handle_linked_chunk_updates(
1795                    LinkedChunkId::Room(room_id),
1796                    vec![
1797                        Update::NewItemsChunk {
1798                            previous: None,
1799                            new: ChunkIdentifier::new(0),
1800                            next: None,
1801                        },
1802                        Update::PushItems {
1803                            at: Position::new(ChunkIdentifier::new(0), 0),
1804                            items: vec![ev1],
1805                        },
1806                        Update::NewItemsChunk {
1807                            previous: Some(ChunkIdentifier::new(0)),
1808                            new: ChunkIdentifier::new(1),
1809                            next: None,
1810                        },
1811                        Update::PushItems {
1812                            at: Position::new(ChunkIdentifier::new(1), 0),
1813                            items: vec![ev2],
1814                        },
1815                    ],
1816                )
1817                .await
1818                .unwrap();
1819        }
1820
1821        let event_cache = client.event_cache();
1822        event_cache.subscribe().unwrap();
1823
1824        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1825        let room = client.get_room(room_id).unwrap();
1826        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1827
1828        // Sanity check: lazily loaded, so only includes one item at start.
1829        let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
1830        assert_eq!(events1.len(), 1);
1831        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
1832        assert!(stream1.is_empty());
1833
1834        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1835
1836        // Force loading the full linked chunk by back-paginating.
1837        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1838        assert_eq!(outcome.events.len(), 1);
1839        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
1840        assert!(outcome.reached_start);
1841
1842        // We also get an update about the loading from the store. Ignore it, for this
1843        // test's sake.
1844        assert_let_timeout!(
1845            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1846                stream1.recv()
1847        );
1848        assert_eq!(diffs.len(), 1);
1849        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
1850            assert_eq!(value.event_id().as_deref(), Some(evid1));
1851        });
1852
1853        assert!(stream1.is_empty());
1854
1855        assert_let_timeout!(
1856            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1857        );
1858        assert_eq!(expected_room_id, room_id);
1859        assert!(generic_stream.is_empty());
1860
1861        // Have another subscriber.
1862        // Since it's not the first one, and the previous one loaded some more events,
1863        // the second subscribers sees them all.
1864        let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
1865        assert_eq!(events2.len(), 2);
1866        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
1867        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
1868        assert!(stream2.is_empty());
1869
1870        // Drop the first stream, and wait a bit.
1871        drop(stream1);
1872        yield_now().await;
1873
1874        // The second stream remains undisturbed.
1875        assert!(stream2.is_empty());
1876
1877        // Now drop the second stream, and wait a bit.
1878        drop(stream2);
1879        yield_now().await;
1880
1881        // The linked chunk must have auto-shrunk by now.
1882
1883        {
1884            // Check the inner state: there's no more shared auto-shrinker.
1885            let state = room_event_cache.inner.state.read().await.unwrap();
1886            assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
1887        }
1888
1889        // Getting the events will only give us the latest chunk.
1890        let events3 = room_event_cache.events().await.unwrap();
1891        assert_eq!(events3.len(), 1);
1892        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
1893    }
1894
1895    #[async_test]
1896    async fn test_rfind_map_event_in_memory_by() {
1897        let user_id = user_id!("@mnt_io:matrix.org");
1898        let room_id = room_id!("!raclette:patate.ch");
1899        let client = MockClientBuilder::new(None).build().await;
1900
1901        let event_factory = EventFactory::new().room(room_id);
1902
1903        let event_id_0 = event_id!("$ev0");
1904        let event_id_1 = event_id!("$ev1");
1905        let event_id_2 = event_id!("$ev2");
1906        let event_id_3 = event_id!("$ev3");
1907
1908        let event_0 =
1909            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
1910        let event_1 =
1911            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
1912        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
1913        let event_3 =
1914            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
1915
1916        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
1917        // events.
1918        {
1919            client
1920                .event_cache_store()
1921                .lock()
1922                .await
1923                .expect("Could not acquire the event cache lock")
1924                .as_clean()
1925                .expect("Could not acquire a clean event cache lock")
1926                .handle_linked_chunk_updates(
1927                    LinkedChunkId::Room(room_id),
1928                    vec![
1929                        Update::NewItemsChunk {
1930                            previous: None,
1931                            new: ChunkIdentifier::new(0),
1932                            next: None,
1933                        },
1934                        Update::PushItems {
1935                            at: Position::new(ChunkIdentifier::new(0), 0),
1936                            items: vec![event_3],
1937                        },
1938                        Update::NewItemsChunk {
1939                            previous: Some(ChunkIdentifier::new(0)),
1940                            new: ChunkIdentifier::new(1),
1941                            next: None,
1942                        },
1943                        Update::PushItems {
1944                            at: Position::new(ChunkIdentifier::new(1), 0),
1945                            items: vec![event_0, event_1, event_2],
1946                        },
1947                    ],
1948                )
1949                .await
1950                .unwrap();
1951        }
1952
1953        let event_cache = client.event_cache();
1954        event_cache.subscribe().unwrap();
1955
1956        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1957        let room = client.get_room(room_id).unwrap();
1958        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1959
1960        // Look for an event from `BOB`: it must be `event_0`.
1961        assert_matches!(
1962            room_event_cache
1963                .rfind_map_event_in_memory_by(|event| {
1964                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
1965                })
1966                .await,
1967            Ok(Some(event_id)) => {
1968                assert_eq!(event_id.as_deref(), Some(event_id_0));
1969            }
1970        );
1971
1972        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
1973        // because events are looked for in reverse order.
1974        assert_matches!(
1975            room_event_cache
1976                .rfind_map_event_in_memory_by(|event| {
1977                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
1978                })
1979                .await,
1980            Ok(Some(event_id)) => {
1981                assert_eq!(event_id.as_deref(), Some(event_id_2));
1982            }
1983        );
1984
1985        // Look for an event that is inside the storage, but not loaded.
1986        assert!(
1987            room_event_cache
1988                .rfind_map_event_in_memory_by(|event| {
1989                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
1990                        == Some(user_id))
1991                    .then(|| event.event_id())
1992                })
1993                .await
1994                .unwrap()
1995                .is_none()
1996        );
1997
1998        // Look for an event that doesn't exist.
1999        assert!(
2000            room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
2001        );
2002    }
2003
2004    #[async_test]
2005    async fn test_reload_when_dirty() {
2006        let user_id = user_id!("@mnt_io:matrix.org");
2007        let room_id = room_id!("!raclette:patate.ch");
2008
2009        // The storage shared by the two clients.
2010        let event_cache_store = MemoryStore::new();
2011
2012        // Client for the process 0.
2013        let client_p0 = MockClientBuilder::new(None)
2014            .on_builder(|builder| {
2015                builder.store_config(
2016                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2017                        .event_cache_store(event_cache_store.clone()),
2018                )
2019            })
2020            .build()
2021            .await;
2022
2023        // Client for the process 1.
2024        let client_p1 = MockClientBuilder::new(None)
2025            .on_builder(|builder| {
2026                builder.store_config(
2027                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2028                        .event_cache_store(event_cache_store),
2029                )
2030            })
2031            .build()
2032            .await;
2033
2034        let event_factory = EventFactory::new().room(room_id).sender(user_id);
2035
2036        let ev_id_0 = event_id!("$ev_0");
2037        let ev_id_1 = event_id!("$ev_1");
2038
2039        let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
2040        let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
2041
2042        // Add events to the storage (shared by the two clients!).
2043        client_p0
2044            .event_cache_store()
2045            .lock()
2046            .await
2047            .expect("[p0] Could not acquire the event cache lock")
2048            .as_clean()
2049            .expect("[p0] Could not acquire a clean event cache lock")
2050            .handle_linked_chunk_updates(
2051                LinkedChunkId::Room(room_id),
2052                vec![
2053                    Update::NewItemsChunk {
2054                        previous: None,
2055                        new: ChunkIdentifier::new(0),
2056                        next: None,
2057                    },
2058                    Update::PushItems {
2059                        at: Position::new(ChunkIdentifier::new(0), 0),
2060                        items: vec![ev_0],
2061                    },
2062                    Update::NewItemsChunk {
2063                        previous: Some(ChunkIdentifier::new(0)),
2064                        new: ChunkIdentifier::new(1),
2065                        next: None,
2066                    },
2067                    Update::PushItems {
2068                        at: Position::new(ChunkIdentifier::new(1), 0),
2069                        items: vec![ev_1],
2070                    },
2071                ],
2072            )
2073            .await
2074            .unwrap();
2075
2076        // Subscribe the event caches, and create the room.
2077        let (room_event_cache_p0, room_event_cache_p1) = {
2078            let event_cache_p0 = client_p0.event_cache();
2079            event_cache_p0.subscribe().unwrap();
2080
2081            let event_cache_p1 = client_p1.event_cache();
2082            event_cache_p1.subscribe().unwrap();
2083
2084            client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2085            client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2086
2087            let (room_event_cache_p0, _drop_handles) =
2088                client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
2089            let (room_event_cache_p1, _drop_handles) =
2090                client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
2091
2092            (room_event_cache_p0, room_event_cache_p1)
2093        };
2094
2095        // Okay. We are ready for the test!
2096        //
2097        // First off, let's check `room_event_cache_p0` has access to the first event
2098        // loaded in-memory, then do a pagination, and see more events.
2099        let mut updates_stream_p0 = {
2100            let room_event_cache = &room_event_cache_p0;
2101
2102            let (initial_updates, mut updates_stream) =
2103                room_event_cache_p0.subscribe().await.unwrap();
2104
2105            // Initial updates contain `ev_id_1` only.
2106            assert_eq!(initial_updates.len(), 1);
2107            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2108            assert!(updates_stream.is_empty());
2109
2110            // `ev_id_1` must be loaded in memory.
2111            assert!(event_loaded(room_event_cache, ev_id_1).await);
2112
2113            // `ev_id_0` must NOT be loaded in memory.
2114            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2115
2116            // Load one more event with a backpagination.
2117            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2118
2119            // A new update for `ev_id_0` must be present.
2120            assert_matches!(
2121                updates_stream.recv().await.unwrap(),
2122                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2123                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2124                    assert_matches!(
2125                        &diffs[0],
2126                        VectorDiff::Insert { index: 0, value: event } => {
2127                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2128                        }
2129                    );
2130                }
2131            );
2132
2133            // `ev_id_0` must now be loaded in memory.
2134            assert!(event_loaded(room_event_cache, ev_id_0).await);
2135
2136            updates_stream
2137        };
2138
2139        // Second, let's check `room_event_cache_p1` has the same accesses.
2140        let mut updates_stream_p1 = {
2141            let room_event_cache = &room_event_cache_p1;
2142            let (initial_updates, mut updates_stream) =
2143                room_event_cache_p1.subscribe().await.unwrap();
2144
2145            // Initial updates contain `ev_id_1` only.
2146            assert_eq!(initial_updates.len(), 1);
2147            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
2148            assert!(updates_stream.is_empty());
2149
2150            // `ev_id_1` must be loaded in memory.
2151            assert!(event_loaded(room_event_cache, ev_id_1).await);
2152
2153            // `ev_id_0` must NOT be loaded in memory.
2154            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2155
2156            // Load one more event with a backpagination.
2157            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2158
2159            // A new update for `ev_id_0` must be present.
2160            assert_matches!(
2161                updates_stream.recv().await.unwrap(),
2162                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2163                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
2164                    assert_matches!(
2165                        &diffs[0],
2166                        VectorDiff::Insert { index: 0, value: event } => {
2167                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2168                        }
2169                    );
2170                }
2171            );
2172
2173            // `ev_id_0` must now be loaded in memory.
2174            assert!(event_loaded(room_event_cache, ev_id_0).await);
2175
2176            updates_stream
2177        };
2178
2179        // Do this a couple times, for the fun.
2180        for _ in 0..3 {
2181            // Third, because `room_event_cache_p1` has locked the store, the lock
2182            // is dirty for `room_event_cache_p0`, so it will shrink to its last
2183            // chunk!
2184            {
2185                let room_event_cache = &room_event_cache_p0;
2186                let updates_stream = &mut updates_stream_p0;
2187
2188                // `ev_id_1` must be loaded in memory, just like before.
2189                assert!(event_loaded(room_event_cache, ev_id_1).await);
2190
2191                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2192                // state has been reloaded to its last chunk.
2193                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2194
2195                // The reload can be observed via the updates too.
2196                assert_matches!(
2197                    updates_stream.recv().await.unwrap(),
2198                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2199                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2200                        assert_matches!(&diffs[0], VectorDiff::Clear);
2201                        assert_matches!(
2202                            &diffs[1],
2203                            VectorDiff::Append { values: events } => {
2204                                assert_eq!(events.len(), 1);
2205                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2206                            }
2207                        );
2208                    }
2209                );
2210
2211                // Load one more event with a backpagination.
2212                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2213
2214                // `ev_id_0` must now be loaded in memory.
2215                assert!(event_loaded(room_event_cache, ev_id_0).await);
2216
2217                // The pagination can be observed via the updates too.
2218                assert_matches!(
2219                    updates_stream.recv().await.unwrap(),
2220                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2221                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2222                        assert_matches!(
2223                            &diffs[0],
2224                            VectorDiff::Insert { index: 0, value: event } => {
2225                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2226                            }
2227                        );
2228                    }
2229                );
2230            }
2231
2232            // Fourth, because `room_event_cache_p0` has locked the store again, the lock
2233            // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
2234            // chunk!
2235            {
2236                let room_event_cache = &room_event_cache_p1;
2237                let updates_stream = &mut updates_stream_p1;
2238
2239                // `ev_id_1` must be loaded in memory, just like before.
2240                assert!(event_loaded(room_event_cache, ev_id_1).await);
2241
2242                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
2243                // state has shrunk to its last chunk.
2244                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2245
2246                // The reload can be observed via the updates too.
2247                assert_matches!(
2248                    updates_stream.recv().await.unwrap(),
2249                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2250                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2251                        assert_matches!(&diffs[0], VectorDiff::Clear);
2252                        assert_matches!(
2253                            &diffs[1],
2254                            VectorDiff::Append { values: events } => {
2255                                assert_eq!(events.len(), 1);
2256                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2257                            }
2258                        );
2259                    }
2260                );
2261
2262                // Load one more event with a backpagination.
2263                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2264
2265                // `ev_id_0` must now be loaded in memory.
2266                assert!(event_loaded(room_event_cache, ev_id_0).await);
2267
2268                // The pagination can be observed via the updates too.
2269                assert_matches!(
2270                    updates_stream.recv().await.unwrap(),
2271                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2272                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2273                        assert_matches!(
2274                            &diffs[0],
2275                            VectorDiff::Insert { index: 0, value: event } => {
2276                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2277                            }
2278                        );
2279                    }
2280                );
2281            }
2282        }
2283
2284        // Repeat that with an explicit read lock (so that we don't rely on
2285        // `event_loaded` to trigger the dirty detection).
2286        for _ in 0..3 {
2287            {
2288                let room_event_cache = &room_event_cache_p0;
2289                let updates_stream = &mut updates_stream_p0;
2290
2291                let guard = room_event_cache.inner.state.read().await.unwrap();
2292
2293                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2294                // shared access.
2295                // See `RoomEventCacheStateLock::read` to learn more.
2296
2297                // The lock is no longer marked as dirty, it's been cleaned.
2298                assert!(guard.is_dirty().not());
2299
2300                // The reload can be observed via the updates too.
2301                assert_matches!(
2302                    updates_stream.recv().await.unwrap(),
2303                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2304                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2305                        assert_matches!(&diffs[0], VectorDiff::Clear);
2306                        assert_matches!(
2307                            &diffs[1],
2308                            VectorDiff::Append { values: events } => {
2309                                assert_eq!(events.len(), 1);
2310                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2311                            }
2312                        );
2313                    }
2314                );
2315
2316                assert!(event_loaded(room_event_cache, ev_id_1).await);
2317                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2318
2319                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2320                // want to make this super explicit).
2321                //
2322                // We drop need to drop it before the pagination because the pagination needs to
2323                // obtain a write lock.
2324                drop(guard);
2325
2326                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2327                assert!(event_loaded(room_event_cache, ev_id_0).await);
2328
2329                // The pagination can be observed via the updates too.
2330                assert_matches!(
2331                    updates_stream.recv().await.unwrap(),
2332                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2333                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2334                        assert_matches!(
2335                            &diffs[0],
2336                            VectorDiff::Insert { index: 0, value: event } => {
2337                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2338                            }
2339                        );
2340                    }
2341                );
2342            }
2343
2344            {
2345                let room_event_cache = &room_event_cache_p1;
2346                let updates_stream = &mut updates_stream_p1;
2347
2348                let guard = room_event_cache.inner.state.read().await.unwrap();
2349
2350                // Guard is kept alive, to ensure we can have multiple read guards alive with a
2351                // shared access.
2352
2353                // The lock is no longer marked as dirty, it's been cleaned.
2354                assert!(guard.is_dirty().not());
2355
2356                // The reload can be observed via the updates too.
2357                assert_matches!(
2358                    updates_stream.recv().await.unwrap(),
2359                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2360                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2361                        assert_matches!(&diffs[0], VectorDiff::Clear);
2362                        assert_matches!(
2363                            &diffs[1],
2364                            VectorDiff::Append { values: events } => {
2365                                assert_eq!(events.len(), 1);
2366                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2367                            }
2368                        );
2369                    }
2370                );
2371
2372                assert!(event_loaded(room_event_cache, ev_id_1).await);
2373                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2374
2375                // Ensure `guard` is alive up to this point (in case this test is refactored, I
2376                // want to make this super explicit).
2377                //
2378                // We drop need to drop it before the pagination because the pagination needs to
2379                // obtain a write lock.
2380                drop(guard);
2381
2382                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2383                assert!(event_loaded(room_event_cache, ev_id_0).await);
2384
2385                // The pagination can be observed via the updates too.
2386                assert_matches!(
2387                    updates_stream.recv().await.unwrap(),
2388                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2389                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2390                        assert_matches!(
2391                            &diffs[0],
2392                            VectorDiff::Insert { index: 0, value: event } => {
2393                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2394                            }
2395                        );
2396                    }
2397                );
2398            }
2399        }
2400
2401        // Repeat that with an explicit write lock.
2402        for _ in 0..3 {
2403            {
2404                let room_event_cache = &room_event_cache_p0;
2405                let updates_stream = &mut updates_stream_p0;
2406
2407                let guard = room_event_cache.inner.state.write().await.unwrap();
2408
2409                // The lock is no longer marked as dirty, it's been cleaned.
2410                assert!(guard.is_dirty().not());
2411
2412                // The reload can be observed via the updates too.
2413                assert_matches!(
2414                    updates_stream.recv().await.unwrap(),
2415                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2416                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2417                        assert_matches!(&diffs[0], VectorDiff::Clear);
2418                        assert_matches!(
2419                            &diffs[1],
2420                            VectorDiff::Append { values: events } => {
2421                                assert_eq!(events.len(), 1);
2422                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2423                            }
2424                        );
2425                    }
2426                );
2427
2428                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2429                // needs to obtain a read lock.
2430                drop(guard);
2431
2432                assert!(event_loaded(room_event_cache, ev_id_1).await);
2433                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2434
2435                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2436                assert!(event_loaded(room_event_cache, ev_id_0).await);
2437
2438                // The pagination can be observed via the updates too.
2439                assert_matches!(
2440                    updates_stream.recv().await.unwrap(),
2441                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2442                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2443                        assert_matches!(
2444                            &diffs[0],
2445                            VectorDiff::Insert { index: 0, value: event } => {
2446                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2447                            }
2448                        );
2449                    }
2450                );
2451            }
2452
2453            {
2454                let room_event_cache = &room_event_cache_p1;
2455                let updates_stream = &mut updates_stream_p1;
2456
2457                let guard = room_event_cache.inner.state.write().await.unwrap();
2458
2459                // The lock is no longer marked as dirty, it's been cleaned.
2460                assert!(guard.is_dirty().not());
2461
2462                // The reload can be observed via the updates too.
2463                assert_matches!(
2464                    updates_stream.recv().await.unwrap(),
2465                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2466                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
2467                        assert_matches!(&diffs[0], VectorDiff::Clear);
2468                        assert_matches!(
2469                            &diffs[1],
2470                            VectorDiff::Append { values: events } => {
2471                                assert_eq!(events.len(), 1);
2472                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
2473                            }
2474                        );
2475                    }
2476                );
2477
2478                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
2479                // needs to obtain a read lock.
2480                drop(guard);
2481
2482                assert!(event_loaded(room_event_cache, ev_id_1).await);
2483                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
2484
2485                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
2486                assert!(event_loaded(room_event_cache, ev_id_0).await);
2487
2488                // The pagination can be observed via the updates too.
2489                assert_matches!(
2490                    updates_stream.recv().await.unwrap(),
2491                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
2492                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
2493                        assert_matches!(
2494                            &diffs[0],
2495                            VectorDiff::Insert { index: 0, value: event } => {
2496                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
2497                            }
2498                        );
2499                    }
2500                );
2501            }
2502        }
2503    }
2504
2505    #[async_test]
2506    async fn test_load_when_dirty() {
2507        let room_id_0 = room_id!("!raclette:patate.ch");
2508        let room_id_1 = room_id!("!morbiflette:patate.ch");
2509
2510        // The storage shared by the two clients.
2511        let event_cache_store = MemoryStore::new();
2512
2513        // Client for the process 0.
2514        let client_p0 = MockClientBuilder::new(None)
2515            .on_builder(|builder| {
2516                builder.store_config(
2517                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
2518                        .event_cache_store(event_cache_store.clone()),
2519                )
2520            })
2521            .build()
2522            .await;
2523
2524        // Client for the process 1.
2525        let client_p1 = MockClientBuilder::new(None)
2526            .on_builder(|builder| {
2527                builder.store_config(
2528                    StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
2529                        .event_cache_store(event_cache_store),
2530                )
2531            })
2532            .build()
2533            .await;
2534
2535        // Subscribe the event caches, and create the room.
2536        let (room_event_cache_0_p0, room_event_cache_0_p1) = {
2537            let event_cache_p0 = client_p0.event_cache();
2538            event_cache_p0.subscribe().unwrap();
2539
2540            let event_cache_p1 = client_p1.event_cache();
2541            event_cache_p1.subscribe().unwrap();
2542
2543            client_p0
2544                .base_client()
2545                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
2546            client_p0
2547                .base_client()
2548                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
2549
2550            client_p1
2551                .base_client()
2552                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
2553            client_p1
2554                .base_client()
2555                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
2556
2557            let (room_event_cache_0_p0, _drop_handles) =
2558                client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2559            let (room_event_cache_0_p1, _drop_handles) =
2560                client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
2561
2562            (room_event_cache_0_p0, room_event_cache_0_p1)
2563        };
2564
2565        // Let's make the cross-process lock over the store dirty.
2566        {
2567            drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
2568            drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
2569        }
2570
2571        // Create the `RoomEventCache` for `room_id_1`. During its creation, the
2572        // cross-process lock over the store MUST be dirty, which makes no difference as
2573        // a clean one: the state is just loaded, not reloaded.
2574        let (room_event_cache_1_p0, _) =
2575            client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
2576
2577        // Check the lock isn't dirty because it's been cleared.
2578        {
2579            let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
2580            assert!(guard.is_dirty().not());
2581        }
2582
2583        // The only way to test this behaviour is to see that the dirty block in
2584        // `RoomEventCacheStateLock` is covered by this test.
2585    }
2586
2587    async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
2588        room_event_cache
2589            .rfind_map_event_in_memory_by(|event| {
2590                (event.event_id().as_deref() == Some(event_id)).then_some(())
2591            })
2592            .await
2593            .unwrap()
2594            .is_some()
2595    }
2596}