matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        atomic::{AtomicUsize, Ordering},
23        Arc,
24    },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::AmbiguityChange,
32    event_cache::Event,
33    linked_chunk::Position,
34    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37    api::Direction,
38    events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
39    serde::Raw,
40    EventId, OwnedEventId, OwnedRoomId,
41};
42use tokio::sync::{
43    broadcast::{Receiver, Sender},
44    mpsc, Notify, RwLock,
45};
46use tracing::{instrument, trace, warn};
47
48use super::{
49    AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
50    RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
51};
52use crate::{
53    client::WeakClient,
54    event_cache::EventCacheError,
55    room::{IncludeRelations, RelationsOptions, WeakRoom},
56};
57
58pub(super) mod events;
59mod threads;
60
61pub use threads::ThreadEventCacheUpdate;
62
63/// A subset of an event cache, for a room.
64///
65/// Cloning is shallow, and thus is cheap to do.
66#[derive(Clone)]
67pub struct RoomEventCache {
68    pub(super) inner: Arc<RoomEventCacheInner>,
69}
70
71impl fmt::Debug for RoomEventCache {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("RoomEventCache").finish_non_exhaustive()
74    }
75}
76
77/// Thin wrapper for a room event cache subscriber, so as to trigger
78/// side-effects when all subscribers are gone.
79///
80/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
81/// more subscribers are active. This is an optimisation to reduce the number of
82/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
83/// active, all data are reduced to the minimum.
84///
85/// The side-effect takes effect on `Drop`.
86#[allow(missing_debug_implementations)]
87pub struct RoomEventCacheSubscriber {
88    /// Underlying receiver of the room event cache's updates.
89    recv: Receiver<RoomEventCacheUpdate>,
90
91    /// To which room are we listening?
92    room_id: OwnedRoomId,
93
94    /// Sender to the auto-shrink channel.
95    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
96
97    /// Shared instance of the auto-shrinker.
98    subscriber_count: Arc<AtomicUsize>,
99}
100
101impl Drop for RoomEventCacheSubscriber {
102    fn drop(&mut self) {
103        let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
104
105        trace!(
106            "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
107        );
108
109        if previous_subscriber_count == 1 {
110            // We were the last instance of the subscriber; let the auto-shrinker know by
111            // notifying it of our room id.
112
113            let mut room_id = self.room_id.clone();
114
115            // Try to send without waiting for channel capacity, and restart in a spin-loop
116            // if it failed (until a maximum number of attempts is reached, or
117            // the send was successful). The channel shouldn't be super busy in
118            // general, so this should resolve quickly enough.
119
120            let mut num_attempts = 0;
121
122            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
123                num_attempts += 1;
124
125                if num_attempts > 1024 {
126                    // If we've tried too many times, just give up with a warning; after all, this
127                    // is only an optimization.
128                    warn!(
129                        "couldn't send notification to the auto-shrink channel \
130                         after 1024 attempts; giving up"
131                    );
132                    return;
133                }
134
135                match err {
136                    mpsc::error::TrySendError::Full(stolen_room_id) => {
137                        room_id = stolen_room_id;
138                    }
139                    mpsc::error::TrySendError::Closed(_) => return,
140                }
141            }
142
143            trace!("sent notification to the parent channel that we were the last subscriber");
144        }
145    }
146}
147
148impl Deref for RoomEventCacheSubscriber {
149    type Target = Receiver<RoomEventCacheUpdate>;
150
151    fn deref(&self) -> &Self::Target {
152        &self.recv
153    }
154}
155
156impl DerefMut for RoomEventCacheSubscriber {
157    fn deref_mut(&mut self) -> &mut Self::Target {
158        &mut self.recv
159    }
160}
161
162impl RoomEventCache {
163    /// Create a new [`RoomEventCache`] using the given room and store.
164    pub(super) fn new(
165        client: WeakClient,
166        state: RoomEventCacheState,
167        pagination_status: SharedObservable<RoomPaginationStatus>,
168        room_id: OwnedRoomId,
169        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
170        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
171    ) -> Self {
172        Self {
173            inner: Arc::new(RoomEventCacheInner::new(
174                client,
175                state,
176                pagination_status,
177                room_id,
178                auto_shrink_sender,
179                generic_update_sender,
180            )),
181        }
182    }
183
184    /// Read all current events.
185    ///
186    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
187    /// subscriber.
188    pub async fn events(&self) -> Vec<Event> {
189        let state = self.inner.state.read().await;
190
191        state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect()
192    }
193
194    /// Subscribe to this room updates, after getting the initial list of
195    /// events.
196    ///
197    /// Use [`RoomEventCache::events`] to get all current events without the
198    /// subscriber. Creating, and especially dropping, a
199    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
200    pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheSubscriber) {
201        let state = self.inner.state.read().await;
202        let events =
203            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
204
205        let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst);
206        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
207
208        let recv = self.inner.sender.subscribe();
209        let subscriber = RoomEventCacheSubscriber {
210            recv,
211            room_id: self.inner.room_id.clone(),
212            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
213            subscriber_count: state.subscriber_count.clone(),
214        };
215
216        (events, subscriber)
217    }
218
219    /// Subscribe to thread for a given root event, and get a (maybe empty)
220    /// initially known list of events for that thread.
221    pub async fn subscribe_to_thread(
222        &self,
223        thread_root: OwnedEventId,
224    ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
225        let mut state = self.inner.state.write().await;
226        state.subscribe_to_thread(thread_root)
227    }
228
229    /// Paginate backwards in a thread, given its root event ID.
230    ///
231    /// Returns whether we've hit the start of the thread, in which case the
232    /// root event will be prepended to the thread.
233    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
234    pub async fn paginate_thread_backwards(
235        &self,
236        thread_root: OwnedEventId,
237        num_events: u16,
238    ) -> Result<bool> {
239        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
240
241        // Take the lock only for a short time here.
242        let mut outcome =
243            self.inner.state.write().await.load_more_thread_events_backwards(thread_root.clone());
244
245        loop {
246            match outcome {
247                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
248                    // Start a threaded pagination from this gap.
249                    let options = RelationsOptions {
250                        from: prev_token.clone(),
251                        dir: Direction::Backward,
252                        limit: Some(num_events.into()),
253                        include_relations: IncludeRelations::AllRelations,
254                        recurse: true,
255                    };
256
257                    let mut result = room
258                        .relations(thread_root.clone(), options)
259                        .await
260                        .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
261
262                    let reached_start = result.next_batch_token.is_none();
263                    trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
264
265                    // Because the state lock is taken again in `load_or_fetch_event`, we need
266                    // to do this *before* we take the state lock again.
267                    if reached_start {
268                        // Prepend the thread root event to the results.
269                        let root_event = room
270                            .load_or_fetch_event(&thread_root, None)
271                            .await
272                            .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
273
274                        // Note: the events are still in the reversed order at this point, so
275                        // pushing will eventually make it so that the root event is the first.
276                        result.chunk.push(root_event);
277                    }
278
279                    let mut state = self.inner.state.write().await;
280
281                    if let Some(outcome) = state.finish_thread_network_pagination(
282                        thread_root.clone(),
283                        prev_token,
284                        result.next_batch_token,
285                        result.chunk,
286                    ) {
287                        return Ok(outcome.reached_start);
288                    }
289
290                    // fallthrough: restart the pagination.
291                    outcome = state.load_more_thread_events_backwards(thread_root.clone());
292                }
293
294                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
295                    // We're done!
296                    return Ok(true);
297                }
298
299                LoadMoreEventsBackwardsOutcome::Events { .. } => {
300                    // TODO: implement :)
301                    unimplemented!("loading from disk for threads is not implemented yet");
302                }
303
304                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
305                    unreachable!("unused for threads")
306                }
307            }
308        }
309    }
310
311    /// Return a [`RoomPagination`] API object useful for running
312    /// back-pagination queries in the current room.
313    pub fn pagination(&self) -> RoomPagination {
314        RoomPagination { inner: self.inner.clone() }
315    }
316
317    /// Try to find a single event in this room, starting from the most recent
318    /// event.
319    ///
320    /// **Warning**! It looks into the loaded events from the in-memory linked
321    /// chunk **only**. It doesn't look inside the storage.
322    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Option<O>
323    where
324        P: FnMut(&Event) -> Option<O>,
325    {
326        self.inner.state.read().await.rfind_map_event_in_memory_by(predicate)
327    }
328
329    /// Try to find an event by ID in this room.
330    ///
331    /// It starts by looking into loaded events before looking inside the
332    /// storage.
333    pub async fn find_event(&self, event_id: &EventId) -> Option<Event> {
334        self.inner
335            .state
336            .read()
337            .await
338            .find_event(event_id)
339            .await
340            .ok()
341            .flatten()
342            .map(|(_loc, event)| event)
343    }
344
345    /// Try to find an event by ID in this room, along with its related events.
346    ///
347    /// You can filter which types of related events to retrieve using
348    /// `filter`. `None` will retrieve related events of any type.
349    ///
350    /// The related events are sorted like this:
351    ///
352    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
353    ///   located at the beginning of the array.
354    /// - events present in the linked chunk (be it in memory or in the storage)
355    ///   will be sorted according to their ordering in the linked chunk.
356    pub async fn find_event_with_relations(
357        &self,
358        event_id: &EventId,
359        filter: Option<Vec<RelationType>>,
360    ) -> Option<(Event, Vec<Event>)> {
361        // Search in all loaded or stored events.
362        self.inner
363            .state
364            .read()
365            .await
366            .find_event_with_relations(event_id, filter.clone())
367            .await
368            .ok()
369            .flatten()
370    }
371
372    /// Clear all the storage for this [`RoomEventCache`].
373    ///
374    /// This will get rid of all the events from the linked chunk and persisted
375    /// storage.
376    pub async fn clear(&self) -> Result<()> {
377        // Clear the linked chunk and persisted storage.
378        let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
379
380        // Notify observers about the update.
381        let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
382            diffs: updates_as_vector_diffs,
383            origin: EventsOrigin::Cache,
384        });
385
386        // Notify observers about the generic update.
387        let _ = self
388            .inner
389            .generic_update_sender
390            .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
391
392        Ok(())
393    }
394
395    /// Save some events in the event cache, for further retrieval with
396    /// [`Self::event`].
397    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
398        if let Err(err) = self.inner.state.write().await.save_event(events).await {
399            warn!("couldn't save event in the event cache: {err}");
400        }
401    }
402
403    /// Return a nice debug string (a vector of lines) for the linked chunk of
404    /// events for this room.
405    pub async fn debug_string(&self) -> Vec<String> {
406        self.inner.state.read().await.room_linked_chunk().debug_string()
407    }
408}
409
410/// The (non-cloneable) details of the `RoomEventCache`.
411pub(super) struct RoomEventCacheInner {
412    /// The room id for this room.
413    pub(super) room_id: OwnedRoomId,
414
415    pub weak_room: WeakRoom,
416
417    /// Sender part for subscribers to this room.
418    pub sender: Sender<RoomEventCacheUpdate>,
419
420    /// State for this room's event cache.
421    pub state: RwLock<RoomEventCacheState>,
422
423    /// A notifier that we received a new pagination token.
424    pub pagination_batch_token_notifier: Notify,
425
426    pub pagination_status: SharedObservable<RoomPaginationStatus>,
427
428    /// Sender to the auto-shrink channel.
429    ///
430    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
431    /// more details.
432    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
433
434    /// A clone of [`EventCacheInner::generic_update_sender`].
435    ///
436    /// Whilst `EventCacheInner` handles the generic updates from the sync, or
437    /// the storage, it doesn't handle the update from pagination. Having a
438    /// clone here allows to access it from [`RoomPagination`].
439    pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
440}
441
442impl RoomEventCacheInner {
443    /// Creates a new cache for a room, and subscribes to room updates, so as
444    /// to handle new timeline events.
445    fn new(
446        client: WeakClient,
447        state: RoomEventCacheState,
448        pagination_status: SharedObservable<RoomPaginationStatus>,
449        room_id: OwnedRoomId,
450        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
451        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
452    ) -> Self {
453        let sender = Sender::new(32);
454        let weak_room = WeakRoom::new(client, room_id);
455        Self {
456            room_id: weak_room.room_id().to_owned(),
457            weak_room,
458            state: RwLock::new(state),
459            sender,
460            pagination_batch_token_notifier: Default::default(),
461            auto_shrink_sender,
462            pagination_status,
463            generic_update_sender,
464        }
465    }
466
467    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
468        if account_data.is_empty() {
469            return;
470        }
471
472        let mut handled_read_marker = false;
473
474        trace!("Handling account data");
475
476        for raw_event in account_data {
477            match raw_event.deserialize() {
478                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
479                    // If duplicated, do not forward read marker multiple times
480                    // to avoid clutter the update channel.
481                    if handled_read_marker {
482                        continue;
483                    }
484
485                    handled_read_marker = true;
486
487                    // Propagate to observers. (We ignore the error if there aren't any.)
488                    let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
489                        event_id: ev.content.event_id,
490                    });
491                }
492
493                Ok(_) => {
494                    // We're not interested in other room account data updates,
495                    // at this point.
496                }
497
498                Err(e) => {
499                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
500                    warn!(event_type, "Failed to deserialize account data: {e}");
501                }
502            }
503        }
504    }
505
506    #[instrument(skip_all, fields(room_id = %self.room_id))]
507    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
508        self.handle_timeline(
509            updates.timeline,
510            updates.ephemeral.clone(),
511            updates.ambiguity_changes,
512        )
513        .await?;
514        self.handle_account_data(updates.account_data);
515
516        Ok(())
517    }
518
519    #[instrument(skip_all, fields(room_id = %self.room_id))]
520    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
521        self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
522
523        Ok(())
524    }
525
526    /// Handle a [`Timeline`], i.e. new events received by a sync for this
527    /// room.
528    async fn handle_timeline(
529        &self,
530        timeline: Timeline,
531        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
532        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
533    ) -> Result<()> {
534        if timeline.events.is_empty()
535            && timeline.prev_batch.is_none()
536            && ephemeral_events.is_empty()
537            && ambiguity_changes.is_empty()
538        {
539            return Ok(());
540        }
541
542        // Add all the events to the backend.
543        trace!("adding new events");
544
545        let (stored_prev_batch_token, timeline_event_diffs) =
546            self.state.write().await.handle_sync(timeline).await?;
547
548        // Now that all events have been added, we can trigger the
549        // `pagination_token_notifier`.
550        if stored_prev_batch_token {
551            self.pagination_batch_token_notifier.notify_one();
552        }
553
554        // The order matters here: first send the timeline event diffs, then only the
555        // related events (read receipts, etc.).
556        if !timeline_event_diffs.is_empty() {
557            let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
558                diffs: timeline_event_diffs,
559                origin: EventsOrigin::Sync,
560            });
561
562            let _ = self
563                .generic_update_sender
564                .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
565        }
566
567        if !ephemeral_events.is_empty() {
568            let _ = self
569                .sender
570                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
571        }
572
573        if !ambiguity_changes.is_empty() {
574            let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
575        }
576
577        Ok(())
578    }
579}
580
581/// Internal type to represent the output of
582/// [`RoomEventCacheState::load_more_events_backwards`].
583#[derive(Debug)]
584pub(super) enum LoadMoreEventsBackwardsOutcome {
585    /// A gap has been inserted.
586    Gap {
587        /// The previous batch token to be used as the "end" parameter in the
588        /// back-pagination request.
589        prev_token: Option<String>,
590    },
591
592    /// The start of the timeline has been reached.
593    StartOfTimeline,
594
595    /// Events have been inserted.
596    Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
597
598    /// The caller must wait for the initial previous-batch token, and retry.
599    WaitForInitialPrevToken,
600}
601
602// Use a private module to hide `events` to this parent module.
603mod private {
604    use std::{
605        collections::{BTreeMap, HashMap, HashSet},
606        sync::{atomic::AtomicUsize, Arc},
607    };
608
609    use eyeball::SharedObservable;
610    use eyeball_im::VectorDiff;
611    use matrix_sdk_base::{
612        apply_redaction,
613        deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
614        event_cache::{
615            store::{DynEventCacheStore, EventCacheStoreLock},
616            Event, Gap,
617        },
618        linked_chunk::{
619            lazy_loader::{self},
620            ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
621            OwnedLinkedChunkId, Position, Update,
622        },
623        serde_helpers::extract_thread_root,
624        sync::Timeline,
625    };
626    use matrix_sdk_common::executor::spawn;
627    use ruma::{
628        events::{
629            relation::RelationType, room::redaction::SyncRoomRedactionEvent,
630            AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
631        },
632        room_version_rules::RoomVersionRules,
633        serde::Raw,
634        EventId, OwnedEventId, OwnedRoomId,
635    };
636    use tokio::sync::broadcast::{Receiver, Sender};
637    use tracing::{debug, error, instrument, trace, warn};
638
639    use super::{
640        super::{deduplicator::DeduplicationOutcome, EventCacheError},
641        events::EventLinkedChunk,
642        sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
643    };
644    use crate::event_cache::{
645        deduplicator::filter_duplicate_events, room::threads::ThreadEventCache,
646        BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
647        ThreadEventCacheUpdate,
648    };
649
650    /// State for a single room's event cache.
651    ///
652    /// This contains all the inner mutable states that ought to be updated at
653    /// the same time.
654    pub struct RoomEventCacheState {
655        /// The room this state relates to.
656        room: OwnedRoomId,
657
658        /// The rules for the version of this room.
659        room_version_rules: RoomVersionRules,
660
661        /// Whether thread support has been enabled for the event cache.
662        enabled_thread_support: bool,
663
664        /// Reference to the underlying backing store.
665        store: EventCacheStoreLock,
666
667        /// The loaded events for the current room, that is, the in-memory
668        /// linked chunk for this room.
669        room_linked_chunk: EventLinkedChunk,
670
671        /// Threads present in this room.
672        ///
673        /// Keyed by the thread root event ID.
674        threads: HashMap<OwnedEventId, ThreadEventCache>,
675
676        /// Have we ever waited for a previous-batch-token to come from sync, in
677        /// the context of pagination? We do this at most once per room,
678        /// the first time we try to run backward pagination. We reset
679        /// that upon clearing the timeline events.
680        pub waited_for_initial_prev_token: bool,
681
682        pagination_status: SharedObservable<RoomPaginationStatus>,
683
684        /// See doc comment of
685        /// [`super::super::EventCacheInner::linked_chunk_update_sender`].
686        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
687
688        /// An atomic count of the current number of subscriber of the
689        /// [`super::RoomEventCache`].
690        pub(super) subscriber_count: Arc<AtomicUsize>,
691    }
692
693    impl RoomEventCacheState {
694        /// Create a new state, or reload it from storage if it's been enabled.
695        ///
696        /// Not all events are going to be loaded. Only a portion of them. The
697        /// [`EventLinkedChunk`] relies on a [`LinkedChunk`] to store all
698        /// events. Only the last chunk will be loaded. It means the
699        /// events are loaded from the most recent to the oldest. To
700        /// load more events, see [`Self::load_more_events_backwards`].
701        ///
702        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
703        pub async fn new(
704            room_id: OwnedRoomId,
705            room_version_rules: RoomVersionRules,
706            enabled_thread_support: bool,
707            linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
708            store: EventCacheStoreLock,
709            pagination_status: SharedObservable<RoomPaginationStatus>,
710        ) -> Result<Self, EventCacheError> {
711            let store_lock = store.lock().await?;
712
713            let linked_chunk_id = LinkedChunkId::Room(&room_id);
714
715            // Load the full linked chunk's metadata, so as to feed the order tracker.
716            //
717            // If loading the full linked chunk failed, we'll clear the event cache, as it
718            // indicates that at some point, there's some malformed data.
719            let full_linked_chunk_metadata =
720                match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await {
721                    Ok(metas) => metas,
722                    Err(err) => {
723                        error!(
724                            "error when loading a linked chunk's metadata from the store: {err}"
725                        );
726
727                        // Try to clear storage for this room.
728                        store_lock
729                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
730                            .await?;
731
732                        // Restart with an empty linked chunk.
733                        None
734                    }
735                };
736
737            let linked_chunk = match store_lock
738                .load_last_chunk(linked_chunk_id)
739                .await
740                .map_err(EventCacheError::from)
741                .and_then(|(last_chunk, chunk_identifier_generator)| {
742                    lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
743                        .map_err(EventCacheError::from)
744                }) {
745                Ok(linked_chunk) => linked_chunk,
746                Err(err) => {
747                    error!(
748                        "error when loading a linked chunk's latest chunk from the store: {err}"
749                    );
750
751                    // Try to clear storage for this room.
752                    store_lock
753                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
754                        .await?;
755
756                    None
757                }
758            };
759
760            let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk(
761                linked_chunk,
762                full_linked_chunk_metadata,
763            );
764
765            // The threads mapping is intentionally empty at start, since we're going to
766            // reload threads lazily, as soon as we need to (based on external
767            // subscribers) or when we get new information about those (from
768            // sync).
769            let threads = HashMap::new();
770
771            Ok(Self {
772                room: room_id,
773                room_version_rules,
774                enabled_thread_support,
775                store,
776                room_linked_chunk,
777                threads,
778                waited_for_initial_prev_token: false,
779                subscriber_count: Default::default(),
780                pagination_status,
781                linked_chunk_update_sender,
782            })
783        }
784
785        /// Load a linked chunk's full metadata, making sure the chunks are
786        /// according to their their links.
787        ///
788        /// Returns `None` if there's no such linked chunk in the store, or an
789        /// error if the linked chunk is malformed.
790        async fn load_linked_chunk_metadata(
791            store: &DynEventCacheStore,
792            linked_chunk_id: LinkedChunkId<'_>,
793        ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
794            let mut all_chunks = store
795                .load_all_chunks_metadata(linked_chunk_id)
796                .await
797                .map_err(EventCacheError::from)?;
798
799            if all_chunks.is_empty() {
800                // There are no chunks, so there's nothing to do.
801                return Ok(None);
802            }
803
804            // Transform the vector into a hashmap, for quick lookup of the predecessors.
805            let chunk_map: HashMap<_, _> =
806                all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
807
808            // Find a last chunk.
809            let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
810            let Some(last) = iter.next() else {
811                return Err(EventCacheError::InvalidLinkedChunkMetadata {
812                    details: "no last chunk found".to_owned(),
813                });
814            };
815
816            // There must at most one last chunk.
817            if let Some(other_last) = iter.next() {
818                return Err(EventCacheError::InvalidLinkedChunkMetadata {
819                    details: format!(
820                        "chunks {} and {} both claim to be last chunks",
821                        last.identifier.index(),
822                        other_last.identifier.index()
823                    ),
824                });
825            }
826
827            // Rewind the chain back to the first chunk, and do some checks at the same
828            // time.
829            let mut seen = HashSet::new();
830            let mut current = last;
831            loop {
832                // If we've already seen this chunk, there's a cycle somewhere.
833                if !seen.insert(current.identifier) {
834                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
835                        details: format!(
836                            "cycle detected in linked chunk at {}",
837                            current.identifier.index()
838                        ),
839                    });
840                }
841
842                let Some(prev_id) = current.previous else {
843                    // If there's no previous chunk, we're done.
844                    if seen.len() != all_chunks.len() {
845                        return Err(EventCacheError::InvalidLinkedChunkMetadata {
846                            details: format!(
847                                "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
848                                seen.len(),
849                                all_chunks.len()
850                            ),
851                        });
852                    }
853                    break;
854                };
855
856                // If the previous chunk is not in the map, then it's unknown
857                // and missing.
858                let Some(pred_meta) = chunk_map.get(&prev_id) else {
859                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
860                        details: format!(
861                            "missing predecessor {} chunk for {}",
862                            prev_id.index(),
863                            current.identifier.index()
864                        ),
865                    });
866                };
867
868                // If the previous chunk isn't connected to the next, then the link is invalid.
869                if pred_meta.next != Some(current.identifier) {
870                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
871                        details: format!(
872                            "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
873                            pred_meta.identifier.index(),
874                            pred_meta.next.map(|chunk_id| chunk_id.index()),
875                            current.identifier.index()
876                        ),
877                    });
878                }
879
880                current = *pred_meta;
881            }
882
883            // At this point, `current` is the identifier of the first chunk.
884            //
885            // Reorder the resulting vector, by going through the chain of `next` links, and
886            // swapping items into their final position.
887            //
888            // Invariant in this loop: all items in [0..i[ are in their final, correct
889            // position.
890            let mut current = current.identifier;
891            for i in 0..all_chunks.len() {
892                // Find the target metadata.
893                let j = all_chunks
894                    .iter()
895                    .rev()
896                    .position(|meta| meta.identifier == current)
897                    .map(|j| all_chunks.len() - 1 - j)
898                    .expect("the target chunk must be present in the metadata");
899                if i != j {
900                    all_chunks.swap(i, j);
901                }
902                if let Some(next) = all_chunks[i].next {
903                    current = next;
904                }
905            }
906
907            Ok(Some(all_chunks))
908        }
909
910        /// Given a fully-loaded linked chunk with no gaps, return the
911        /// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache.
912        fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
913            // If we never received events for this room, this means we've never
914            // received a sync for that room, because every room must have at least a
915            // room creation event. Otherwise, we have reached the start of the
916            // timeline.
917            if self.room_linked_chunk.events().next().is_some() {
918                // If there's at least one event, this means we've reached the start of the
919                // timeline, since the chunk is fully loaded.
920                trace!("chunk is fully loaded and non-empty: reached_start=true");
921                LoadMoreEventsBackwardsOutcome::StartOfTimeline
922            } else if !self.waited_for_initial_prev_token {
923                // There's no events. Since we haven't yet, wait for an initial previous-token.
924                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
925            } else {
926                // Otherwise, we've already waited, *and* received no previous-batch token from
927                // the sync, *and* there are still no events in the fully-loaded
928                // chunk: start back-pagination from the end of the room.
929                LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
930            }
931        }
932
933        /// Load more events backwards if the last chunk is **not** a gap.
934        pub(in super::super) async fn load_more_events_backwards(
935            &mut self,
936        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
937            // If any in-memory chunk is a gap, don't load more events, and let the caller
938            // resolve the gap.
939            if let Some(prev_token) = self.room_linked_chunk.rgap().map(|gap| gap.prev_token) {
940                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
941            }
942
943            // Because `first_chunk` is `not `Send`, get this information before the
944            // `.await` point, so that this `Future` can implement `Send`.
945            let first_chunk_identifier = self
946                .room_linked_chunk
947                .chunks()
948                .next()
949                .expect("a linked chunk is never empty")
950                .identifier();
951
952            let store = self.store.lock().await?;
953
954            // The first chunk is not a gap, we can load its previous chunk.
955            let linked_chunk_id = LinkedChunkId::Room(&self.room);
956            let new_first_chunk = match store
957                .load_previous_chunk(linked_chunk_id, first_chunk_identifier)
958                .await
959            {
960                Ok(Some(new_first_chunk)) => {
961                    // All good, let's continue with this chunk.
962                    new_first_chunk
963                }
964
965                Ok(None) => {
966                    // There's no previous chunk. The chunk is now fully-loaded. Conclude.
967                    return Ok(self.conclude_load_more_for_fully_loaded_chunk());
968                }
969
970                Err(err) => {
971                    error!("error when loading the previous chunk of a linked chunk: {err}");
972
973                    // Clear storage for this room.
974                    store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
975
976                    // Return the error.
977                    return Err(err.into());
978                }
979            };
980
981            let chunk_content = new_first_chunk.content.clone();
982
983            // We've reached the start on disk, if and only if, there was no chunk prior to
984            // the one we just loaded.
985            //
986            // This value is correct, if and only if, it is used for a chunk content of kind
987            // `Items`.
988            let reached_start = new_first_chunk.previous.is_none();
989
990            if let Err(err) = self.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk) {
991                error!("error when inserting the previous chunk into its linked chunk: {err}");
992
993                // Clear storage for this room.
994                store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
995
996                // Return the error.
997                return Err(err.into());
998            }
999
1000            // ⚠️ Let's not propagate the updates to the store! We already have these data
1001            // in the store! Let's drain them.
1002            let _ = self.room_linked_chunk.store_updates().take();
1003
1004            // However, we want to get updates as `VectorDiff`s.
1005            let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1006
1007            Ok(match chunk_content {
1008                ChunkContent::Gap(gap) => {
1009                    trace!("reloaded chunk from disk (gap)");
1010                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1011                }
1012
1013                ChunkContent::Items(events) => {
1014                    trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1015                    LoadMoreEventsBackwardsOutcome::Events {
1016                        events,
1017                        timeline_event_diffs,
1018                        reached_start,
1019                    }
1020                }
1021            })
1022        }
1023
1024        /// If storage is enabled, unload all the chunks, then reloads only the
1025        /// last one.
1026        ///
1027        /// If storage's enabled, return a diff update that starts with a clear
1028        /// of all events; as a result, the caller may override any
1029        /// pending diff updates with the result of this function.
1030        ///
1031        /// Otherwise, returns `None`.
1032        pub(super) async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1033            let store_lock = self.store.lock().await?;
1034
1035            // Attempt to load the last chunk.
1036            let linked_chunk_id = LinkedChunkId::Room(&self.room);
1037            let (last_chunk, chunk_identifier_generator) =
1038                match store_lock.load_last_chunk(linked_chunk_id).await {
1039                    Ok(pair) => pair,
1040
1041                    Err(err) => {
1042                        // If loading the last chunk failed, clear the entire linked chunk.
1043                        error!("error when reloading a linked chunk from memory: {err}");
1044
1045                        // Clear storage for this room.
1046                        store_lock
1047                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1048                            .await?;
1049
1050                        // Restart with an empty linked chunk.
1051                        (None, ChunkIdentifierGenerator::new_from_scratch())
1052                    }
1053                };
1054
1055            debug!("unloading the linked chunk, and resetting it to its last chunk");
1056
1057            // Remove all the chunks from the linked chunks, except for the last one, and
1058            // updates the chunk identifier generator.
1059            if let Err(err) =
1060                self.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1061            {
1062                error!("error when replacing the linked chunk: {err}");
1063                return self.reset_internal().await;
1064            }
1065
1066            // Let pagination observers know that we may have not reached the start of the
1067            // timeline.
1068            // TODO: likely need to cancel any ongoing pagination.
1069            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1070
1071            // Don't propagate those updates to the store; this is only for the in-memory
1072            // representation that we're doing this. Let's drain those store updates.
1073            let _ = self.room_linked_chunk.store_updates().take();
1074
1075            Ok(())
1076        }
1077
1078        /// Automatically shrink the room if there are no more subscribers, as
1079        /// indicated by the atomic number of active subscribers.
1080        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1081        pub(crate) async fn auto_shrink_if_no_subscribers(
1082            &mut self,
1083        ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1084            let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst);
1085
1086            trace!(subscriber_count, "received request to auto-shrink");
1087
1088            if subscriber_count == 0 {
1089                // If we are the last strong reference to the auto-shrinker, we can shrink the
1090                // events data structure to its last chunk.
1091                self.shrink_to_last_chunk().await?;
1092                Ok(Some(self.room_linked_chunk.updates_as_vector_diffs()))
1093            } else {
1094                Ok(None)
1095            }
1096        }
1097
1098        #[cfg(test)]
1099        pub(crate) async fn force_shrink_to_last_chunk(
1100            &mut self,
1101        ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1102            self.shrink_to_last_chunk().await?;
1103            Ok(self.room_linked_chunk.updates_as_vector_diffs())
1104        }
1105
1106        pub(crate) fn room_event_order(&self, event_pos: Position) -> Option<usize> {
1107            self.room_linked_chunk.event_order(event_pos)
1108        }
1109
1110        /// Removes the bundled relations from an event, if they were present.
1111        ///
1112        /// Only replaces the present if it contained bundled relations.
1113        fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1114            // We're going to get rid of the `unsigned`/`m.relations` field, if it's
1115            // present.
1116            // Use a closure that returns an option so we can quickly short-circuit.
1117            let mut closure = || -> Option<()> {
1118                let mut val: serde_json::Value = event.deserialize_as().ok()?;
1119                let unsigned = val.get_mut("unsigned")?;
1120                let unsigned_obj = unsigned.as_object_mut()?;
1121                if unsigned_obj.remove("m.relations").is_some() {
1122                    *event = Raw::new(&val).ok()?.cast_unchecked();
1123                }
1124                None
1125            };
1126            let _ = closure();
1127        }
1128
1129        fn strip_relations_from_event(ev: &mut Event) {
1130            match &mut ev.kind {
1131                TimelineEventKind::Decrypted(decrypted) => {
1132                    // Remove all information about encryption info for
1133                    // the bundled events.
1134                    decrypted.unsigned_encryption_info = None;
1135
1136                    // Remove the `unsigned`/`m.relations` field, if needs be.
1137                    Self::strip_relations_if_present(&mut decrypted.event);
1138                }
1139
1140                TimelineEventKind::UnableToDecrypt { event, .. }
1141                | TimelineEventKind::PlainText { event } => {
1142                    Self::strip_relations_if_present(event);
1143                }
1144            }
1145        }
1146
1147        /// Strips the bundled relations from a collection of events.
1148        fn strip_relations_from_events(items: &mut [Event]) {
1149            for ev in items.iter_mut() {
1150                Self::strip_relations_from_event(ev);
1151            }
1152        }
1153
1154        /// Remove events by their position, in `EventLinkedChunk` and in
1155        /// `EventCacheStore`.
1156        ///
1157        /// This method is purposely isolated because it must ensure that
1158        /// positions are sorted appropriately or it can be disastrous.
1159        #[instrument(skip_all)]
1160        async fn remove_events(
1161            &mut self,
1162            in_memory_events: Vec<(OwnedEventId, Position)>,
1163            in_store_events: Vec<(OwnedEventId, Position)>,
1164        ) -> Result<(), EventCacheError> {
1165            // In-store events.
1166            if !in_store_events.is_empty() {
1167                let mut positions = in_store_events
1168                    .into_iter()
1169                    .map(|(_event_id, position)| position)
1170                    .collect::<Vec<_>>();
1171
1172                sort_positions_descending(&mut positions);
1173
1174                let updates = positions
1175                    .into_iter()
1176                    .map(|pos| Update::RemoveItem { at: pos })
1177                    .collect::<Vec<_>>();
1178
1179                self.apply_store_only_updates(updates).await?;
1180            }
1181
1182            // In-memory events.
1183            if in_memory_events.is_empty() {
1184                // Nothing else to do, return early.
1185                return Ok(());
1186            }
1187
1188            // `remove_events_by_position` is responsible of sorting positions.
1189            self.room_linked_chunk
1190                .remove_events_by_position(
1191                    in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1192                )
1193                .expect("failed to remove an event");
1194
1195            self.propagate_changes().await
1196        }
1197
1198        /// Propagate changes to the underlying storage.
1199        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1200            let updates = self.room_linked_chunk.store_updates().take();
1201            self.send_updates_to_store(updates).await
1202        }
1203
1204        /// Apply some updates that are effective only on the store itself.
1205        ///
1206        /// This method should be used only for updates that happen *outside*
1207        /// the in-memory linked chunk. Such updates must be applied
1208        /// onto the ordering tracker as well as to the persistent
1209        /// storage.
1210        async fn apply_store_only_updates(
1211            &mut self,
1212            updates: Vec<Update<Event, Gap>>,
1213        ) -> Result<(), EventCacheError> {
1214            self.room_linked_chunk.order_tracker.map_updates(&updates);
1215            self.send_updates_to_store(updates).await
1216        }
1217
1218        async fn send_updates_to_store(
1219            &mut self,
1220            mut updates: Vec<Update<Event, Gap>>,
1221        ) -> Result<(), EventCacheError> {
1222            if updates.is_empty() {
1223                return Ok(());
1224            }
1225
1226            // Strip relations from updates which insert or replace items.
1227            for update in updates.iter_mut() {
1228                match update {
1229                    Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1230                    Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1231                    // Other update kinds don't involve adding new events.
1232                    Update::NewItemsChunk { .. }
1233                    | Update::NewGapChunk { .. }
1234                    | Update::RemoveChunk(_)
1235                    | Update::RemoveItem { .. }
1236                    | Update::DetachLastItems { .. }
1237                    | Update::StartReattachItems
1238                    | Update::EndReattachItems
1239                    | Update::Clear => {}
1240                }
1241            }
1242
1243            // Spawn a task to make sure that all the changes are effectively forwarded to
1244            // the store, even if the call to this method gets aborted.
1245            //
1246            // The store cross-process locking involves an actual mutex, which ensures that
1247            // storing updates happens in the expected order.
1248
1249            let store = self.store.clone();
1250            let room_id = self.room.clone();
1251            let cloned_updates = updates.clone();
1252
1253            spawn(async move {
1254                let store = store.lock().await?;
1255
1256                trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1257                let linked_chunk_id = LinkedChunkId::Room(&room_id);
1258                store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1259                trace!("linked chunk updates applied");
1260
1261                super::Result::Ok(())
1262            })
1263            .await
1264            .expect("joining failed")?;
1265
1266            // Forward that the store got updated to observers.
1267            let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1268                linked_chunk_id: OwnedLinkedChunkId::Room(self.room.clone()),
1269                updates,
1270            });
1271
1272            Ok(())
1273        }
1274
1275        /// Reset this data structure as if it were brand new.
1276        ///
1277        /// Return a single diff update that is a clear of all events; as a
1278        /// result, the caller may override any pending diff updates
1279        /// with the result of this function.
1280        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1281        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1282            self.reset_internal().await?;
1283
1284            let diff_updates = self.room_linked_chunk.updates_as_vector_diffs();
1285
1286            // Ensure the contract defined in the doc comment is true:
1287            debug_assert_eq!(diff_updates.len(), 1);
1288            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1289
1290            Ok(diff_updates)
1291        }
1292
1293        async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1294            self.room_linked_chunk.reset();
1295
1296            // No need to update the thread summaries: the room events are
1297            // gone because of the
1298            // reset of `room_linked_chunk`.
1299            //
1300            // Clear the threads.
1301            for thread in self.threads.values_mut() {
1302                thread.clear();
1303            }
1304
1305            self.propagate_changes().await?;
1306
1307            // Reset the pagination state too: pretend we never waited for the initial
1308            // prev-batch token, and indicate that we're not at the start of the
1309            // timeline, since we don't know about that anymore.
1310            self.waited_for_initial_prev_token = false;
1311            // TODO: likely must cancel any ongoing back-paginations too
1312            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1313
1314            Ok(())
1315        }
1316
1317        /// Returns a read-only reference to the underlying room linked chunk.
1318        pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1319            &self.room_linked_chunk
1320        }
1321
1322        //// Find a single event in this room, starting from the most recent event.
1323        ///
1324        /// **Warning**! It looks into the loaded events from the in-memory
1325        /// linked chunk **only**. It doesn't look inside the storage,
1326        /// contrary to [`Self::find_event`].
1327        pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1328        where
1329            P: FnMut(&Event) -> Option<O>,
1330        {
1331            self.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event))
1332        }
1333
1334        /// Find a single event in this room.
1335        ///
1336        /// It starts by looking into loaded events in `EventLinkedChunk` before
1337        /// looking inside the storage.
1338        pub async fn find_event(
1339            &self,
1340            event_id: &EventId,
1341        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1342            // There are supposedly fewer events loaded in memory than in the store. Let's
1343            // start by looking up in the `EventLinkedChunk`.
1344            for (position, event) in self.room_linked_chunk.revents() {
1345                if event.event_id().as_deref() == Some(event_id) {
1346                    return Ok(Some((EventLocation::Memory(position), event.clone())));
1347                }
1348            }
1349
1350            let store = self.store.lock().await?;
1351
1352            Ok(store
1353                .find_event(&self.room, event_id)
1354                .await?
1355                .map(|event| (EventLocation::Store, event)))
1356        }
1357
1358        /// Find an event and all its relations in the persisted storage.
1359        ///
1360        /// This goes straight to the database, as a simplification; we don't
1361        /// expect to need to have to look up in memory events, or that
1362        /// all the related events are actually loaded.
1363        ///
1364        /// The related events are sorted like this:
1365        /// - events saved out-of-band with
1366        ///   [`super::RoomEventCache::save_events`] will be located at the
1367        ///   beginning of the array.
1368        /// - events present in the linked chunk (be it in memory or in the
1369        ///   database) will be sorted according to their ordering in the linked
1370        ///   chunk.
1371        pub async fn find_event_with_relations(
1372            &self,
1373            event_id: &EventId,
1374            filters: Option<Vec<RelationType>>,
1375        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1376            let store = self.store.lock().await?;
1377
1378            // First, hit storage to get the target event and its related events.
1379            let found = store.find_event(&self.room, event_id).await?;
1380
1381            let Some(target) = found else {
1382                // We haven't found the event: return early.
1383                return Ok(None);
1384            };
1385
1386            // Then, initialize the stack with all the related events, to find the
1387            // transitive closure of all the related events.
1388            let mut related =
1389                store.find_event_relations(&self.room, event_id, filters.as_deref()).await?;
1390            let mut stack =
1391                related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
1392
1393            // Also keep track of already seen events, in case there's a loop in the
1394            // relation graph.
1395            let mut already_seen = HashSet::new();
1396            already_seen.insert(event_id.to_owned());
1397
1398            let mut num_iters = 1;
1399
1400            // Find the related event for each previously-related event.
1401            while let Some(event_id) = stack.pop() {
1402                if !already_seen.insert(event_id.clone()) {
1403                    // Skip events we've already seen.
1404                    continue;
1405                }
1406
1407                let other_related =
1408                    store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?;
1409
1410                stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
1411                related.extend(other_related);
1412
1413                num_iters += 1;
1414            }
1415
1416            trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
1417
1418            // Sort the results by their positions in the linked chunk, if available.
1419            //
1420            // If an event doesn't have a known position, it goes to the start of the array.
1421            related.sort_by(|(_, lhs), (_, rhs)| {
1422                use std::cmp::Ordering;
1423                match (lhs, rhs) {
1424                    (None, None) => Ordering::Equal,
1425                    (None, Some(_)) => Ordering::Less,
1426                    (Some(_), None) => Ordering::Greater,
1427                    (Some(lhs), Some(rhs)) => {
1428                        let lhs = self.room_event_order(*lhs);
1429                        let rhs = self.room_event_order(*rhs);
1430
1431                        // The events should have a definite position, but in the case they don't,
1432                        // still consider that not having a position means you'll end at the start
1433                        // of the array.
1434                        match (lhs, rhs) {
1435                            (None, None) => Ordering::Equal,
1436                            (None, Some(_)) => Ordering::Less,
1437                            (Some(_), None) => Ordering::Greater,
1438                            (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
1439                        }
1440                    }
1441                }
1442            });
1443
1444            // Keep only the events, not their positions.
1445            let related = related.into_iter().map(|(event, _pos)| event).collect();
1446
1447            Ok(Some((target, related)))
1448        }
1449
1450        /// Post-process new events, after they have been added to the in-memory
1451        /// linked chunk.
1452        ///
1453        /// Flushes updates to disk first.
1454        async fn post_process_new_events(
1455            &mut self,
1456            events: Vec<Event>,
1457            is_sync: bool,
1458        ) -> Result<(), EventCacheError> {
1459            // Update the store before doing the post-processing.
1460            self.propagate_changes().await?;
1461
1462            let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1463
1464            for event in events {
1465                self.maybe_apply_new_redaction(&event).await?;
1466
1467                if self.enabled_thread_support {
1468                    if let Some(thread_root) = extract_thread_root(event.raw()) {
1469                        new_events_by_thread.entry(thread_root).or_default().push(event.clone());
1470                    } else if let Some(event_id) = event.event_id() {
1471                        // If we spot the root of a thread, add it to its linked chunk.
1472                        if self.threads.contains_key(&event_id) {
1473                            new_events_by_thread.entry(event_id).or_default().push(event.clone());
1474                        }
1475                    }
1476                }
1477
1478                // Save a bundled thread event, if there was one.
1479                if let Some(bundled_thread) = event.bundled_latest_thread_event {
1480                    self.save_event([*bundled_thread]).await?;
1481                }
1482            }
1483
1484            self.update_threads(new_events_by_thread, is_sync).await?;
1485
1486            Ok(())
1487        }
1488
1489        fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1490            // TODO: when there's persistent storage, try to lazily reload from disk, if
1491            // missing from memory.
1492            self.threads.entry(root_event_id.clone()).or_insert_with(|| {
1493                ThreadEventCache::new(
1494                    self.room.clone(),
1495                    root_event_id,
1496                    self.linked_chunk_update_sender.clone(),
1497                )
1498            })
1499        }
1500
1501        #[instrument(skip_all)]
1502        async fn update_threads(
1503            &mut self,
1504            new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1505            is_sync: bool,
1506        ) -> Result<(), EventCacheError> {
1507            for (thread_root, new_events) in new_events_by_thread {
1508                let thread_cache = self.get_or_reload_thread(thread_root.clone());
1509
1510                // If we're not in sync mode, we're receiving events from a room pagination: as
1511                // we don't know where they should be put in a thread linked
1512                // chunk, we don't try to be smart and include them. That's for
1513                // the best.
1514                if is_sync {
1515                    thread_cache.add_live_events(new_events);
1516                }
1517
1518                // Add a thread summary to the (room) event which has the thread root, if we
1519                // knew about it.
1520
1521                let last_event_id = thread_cache.latest_event_id();
1522
1523                let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1524                else {
1525                    trace!(%thread_root, "thread root event is missing from the linked chunk");
1526                    continue;
1527                };
1528
1529                let prev_summary = target_event.thread_summary.summary();
1530                let mut latest_reply =
1531                    prev_summary.as_ref().and_then(|summary| summary.latest_reply.clone());
1532
1533                // Recompute the thread summary, if needs be.
1534
1535                // Read the latest number of thread replies from the store.
1536                //
1537                // Implementation note: since this is based on the `m.relates_to` field, and
1538                // that field can only be present on room messages, we don't have to
1539                // worry about filtering out aggregation events (like
1540                // reactions/edits/etc.). Pretty neat, huh?
1541                let num_replies = {
1542                    let store_guard = &*self.store.lock().await?;
1543                    let related_thread_events = store_guard
1544                        .find_event_relations(
1545                            &self.room,
1546                            &thread_root,
1547                            Some(&[RelationType::Thread]),
1548                        )
1549                        .await?;
1550                    related_thread_events.len().try_into().unwrap_or(u32::MAX)
1551                };
1552
1553                if let Some(last_event_id) = last_event_id {
1554                    latest_reply = Some(last_event_id);
1555                }
1556
1557                let new_summary = ThreadSummary { num_replies, latest_reply };
1558
1559                if prev_summary == Some(&new_summary) {
1560                    trace!(%thread_root, "thread summary is already up-to-date");
1561                    continue;
1562                }
1563
1564                // Trigger an update to observers.
1565                target_event.thread_summary = ThreadSummaryStatus::Some(new_summary);
1566                self.replace_event_at(location, target_event).await?;
1567            }
1568
1569            Ok(())
1570        }
1571
1572        /// Replaces a single event, be it saved in memory or in the store.
1573        ///
1574        /// If it was saved in memory, this will emit a notification to
1575        /// observers that a single item has been replaced. Otherwise,
1576        /// such a notification is not emitted, because observers are
1577        /// unlikely to observe the store updates directly.
1578        async fn replace_event_at(
1579            &mut self,
1580            location: EventLocation,
1581            event: Event,
1582        ) -> Result<(), EventCacheError> {
1583            match location {
1584                EventLocation::Memory(position) => {
1585                    self.room_linked_chunk
1586                        .replace_event_at(position, event)
1587                        .expect("should have been a valid position of an item");
1588                    // We just changed the in-memory representation; synchronize this with
1589                    // the store.
1590                    self.propagate_changes().await?;
1591                }
1592                EventLocation::Store => {
1593                    self.save_event([event]).await?;
1594                }
1595            }
1596
1597            Ok(())
1598        }
1599
1600        /// If the given event is a redaction, try to retrieve the
1601        /// to-be-redacted event in the chunk, and replace it by the
1602        /// redacted form.
1603        #[instrument(skip_all)]
1604        async fn maybe_apply_new_redaction(
1605            &mut self,
1606            event: &Event,
1607        ) -> Result<(), EventCacheError> {
1608            let raw_event = event.raw();
1609
1610            // Do not deserialise the entire event if we aren't certain it's a
1611            // `m.room.redaction`. It saves a non-negligible amount of computations.
1612            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1613                raw_event.get_field::<MessageLikeEventType>("type")
1614            else {
1615                return Ok(());
1616            };
1617
1618            // It is a `m.room.redaction`! We can deserialize it entirely.
1619
1620            let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1621                redaction,
1622            ))) = raw_event.deserialize()
1623            else {
1624                return Ok(());
1625            };
1626
1627            let Some(event_id) = redaction.redacts(&self.room_version_rules.redaction) else {
1628                warn!("missing target event id from the redaction event");
1629                return Ok(());
1630            };
1631
1632            // Replace the redacted event by a redacted form, if we knew about it.
1633            let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1634                trace!("redacted event is missing from the linked chunk");
1635                return Ok(());
1636            };
1637
1638            // Don't redact already redacted events.
1639            if let Ok(deserialized) = target_event.raw().deserialize() {
1640                match deserialized {
1641                    AnySyncTimelineEvent::MessageLike(ev) => {
1642                        if ev.is_redacted() {
1643                            return Ok(());
1644                        }
1645                    }
1646                    AnySyncTimelineEvent::State(ev) => {
1647                        if ev.is_redacted() {
1648                            return Ok(());
1649                        }
1650                    }
1651                }
1652            }
1653
1654            if let Some(redacted_event) = apply_redaction(
1655                target_event.raw(),
1656                event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
1657                &self.room_version_rules.redaction,
1658            ) {
1659                // It's safe to cast `redacted_event` here:
1660                // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
1661                //   when calling .raw(), so it's still one under the hood.
1662                // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
1663                target_event.replace_raw(redacted_event.cast_unchecked());
1664
1665                self.replace_event_at(location, target_event).await?;
1666            }
1667
1668            Ok(())
1669        }
1670
1671        /// Save a single event into the database, without notifying observers.
1672        ///
1673        /// Note: if the event was already saved as part of a linked chunk, and
1674        /// its event id may have changed, it's not safe to use this
1675        /// method because it may break the link between the chunk and
1676        /// the event. Instead, an update to the linked chunk must be used.
1677        pub async fn save_event(
1678            &self,
1679            events: impl IntoIterator<Item = Event>,
1680        ) -> Result<(), EventCacheError> {
1681            let store = self.store.clone();
1682            let room_id = self.room.clone();
1683            let events = events.into_iter().collect::<Vec<_>>();
1684
1685            // Spawn a task so the save is uninterrupted by task cancellation.
1686            spawn(async move {
1687                let store = store.lock().await?;
1688                for event in events {
1689                    store.save_event(&room_id, event).await?;
1690                }
1691                super::Result::Ok(())
1692            })
1693            .await
1694            .expect("joining failed")?;
1695
1696            Ok(())
1697        }
1698
1699        /// Handle the result of a sync.
1700        ///
1701        /// It may send room event cache updates to the given sender, if it
1702        /// generated any of those.
1703        ///
1704        /// Returns `true` for the first part of the tuple if a new gap
1705        /// (previous-batch token) has been inserted, `false` otherwise.
1706        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1707        pub async fn handle_sync(
1708            &mut self,
1709            mut timeline: Timeline,
1710        ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1711            let mut prev_batch = timeline.prev_batch.take();
1712
1713            let DeduplicationOutcome {
1714                all_events: events,
1715                in_memory_duplicated_event_ids,
1716                in_store_duplicated_event_ids,
1717                non_empty_all_duplicates: all_duplicates,
1718            } = filter_duplicate_events(
1719                &self.store,
1720                LinkedChunkId::Room(self.room.as_ref()),
1721                &self.room_linked_chunk,
1722                timeline.events,
1723            )
1724            .await?;
1725
1726            // If the timeline isn't limited, and we already knew about some past events,
1727            // then this definitely knows what the timeline head is (either we know
1728            // about all the events persisted in storage, or we have a gap
1729            // somewhere). In this case, we can ditch the previous-batch
1730            // token, which is an optimization to avoid unnecessary future back-pagination
1731            // requests.
1732            //
1733            // We can also ditch it if we knew about all the events that came from sync,
1734            // namely, they were all deduplicated. In this case, using the
1735            // previous-batch token would only result in fetching other events we
1736            // knew about. This is slightly incorrect in the presence of
1737            // network splits, but this has shown to be Good Enough™.
1738            if !timeline.limited && self.room_linked_chunk.events().next().is_some()
1739                || all_duplicates
1740            {
1741                prev_batch = None;
1742            }
1743
1744            if prev_batch.is_some() {
1745                // Sad time: there's a gap, somewhere, in the timeline, and there's at least one
1746                // non-duplicated event. We don't know which threads might have gappy, so we
1747                // must invalidate them all :(
1748                // TODO(bnjbvr): figure out a better catchup mechanism for threads.
1749                let mut summaries_to_update = Vec::new();
1750
1751                for (thread_root, thread) in self.threads.iter_mut() {
1752                    // Empty the thread's linked chunk.
1753                    thread.clear();
1754
1755                    summaries_to_update.push(thread_root.clone());
1756                }
1757
1758                // Now, update the summaries to indicate that we're not sure what the latest
1759                // thread event is. The thread count can remain as is, as it might still be
1760                // valid, and there's no good value to reset it to, anyways.
1761                for thread_root in summaries_to_update {
1762                    let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1763                    else {
1764                        trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1765                        continue;
1766                    };
1767
1768                    if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1769                        prev_summary.latest_reply = None;
1770
1771                        target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1772
1773                        self.replace_event_at(location, target_event).await?;
1774                    }
1775                }
1776            }
1777
1778            if all_duplicates {
1779                // No new events and no gap (per the previous check), thus no need to change the
1780                // room state. We're done!
1781                return Ok((false, Vec::new()));
1782            }
1783
1784            let has_new_gap = prev_batch.is_some();
1785
1786            // If we've never waited for an initial previous-batch token, and we've now
1787            // inserted a gap, no need to wait for a previous-batch token later.
1788            if !self.waited_for_initial_prev_token && has_new_gap {
1789                self.waited_for_initial_prev_token = true;
1790            }
1791
1792            // Remove the old duplicated events.
1793            //
1794            // We don't have to worry the removals can change the position of the existing
1795            // events, because we are pushing all _new_ `events` at the back.
1796            self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1797                .await?;
1798
1799            self.room_linked_chunk
1800                .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1801
1802            self.post_process_new_events(events, true).await?;
1803
1804            if timeline.limited && has_new_gap {
1805                // If there was a previous batch token for a limited timeline, unload the chunks
1806                // so it only contains the last one; otherwise, there might be a
1807                // valid gap in between, and observers may not render it (yet).
1808                //
1809                // We must do this *after* persisting these events to storage (in
1810                // `post_process_new_events`).
1811                self.shrink_to_last_chunk().await?;
1812            }
1813
1814            let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1815
1816            Ok((has_new_gap, timeline_event_diffs))
1817        }
1818
1819        /// Handle the result of a single back-pagination request.
1820        ///
1821        /// If the `prev_token` is set, then this function will check that the
1822        /// corresponding gap is present in the in-memory linked chunk.
1823        /// If it's not the case, `Ok(None)` will be returned, and the
1824        /// caller may decide to do something based on that (e.g. restart a
1825        /// pagination).
1826        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1827        pub async fn handle_backpagination(
1828            &mut self,
1829            events: Vec<Event>,
1830            mut new_token: Option<String>,
1831            prev_token: Option<String>,
1832        ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1833        {
1834            // Check that the previous token still exists; otherwise it's a sign that the
1835            // room's timeline has been cleared.
1836            let prev_gap_id = if let Some(token) = prev_token {
1837                // Find the corresponding gap in the in-memory linked chunk.
1838                let gap_chunk_id = self.room_linked_chunk.chunk_identifier(|chunk| {
1839                    matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
1840                });
1841
1842                if gap_chunk_id.is_none() {
1843                    // We got a previous-batch token from the linked chunk *before* running the
1844                    // request, but it is missing *after* completing the request.
1845                    //
1846                    // It may be a sign the linked chunk has been reset, but it's fine, per this
1847                    // function's contract.
1848                    return Ok(None);
1849                }
1850
1851                gap_chunk_id
1852            } else {
1853                None
1854            };
1855
1856            let DeduplicationOutcome {
1857                all_events: mut events,
1858                in_memory_duplicated_event_ids,
1859                in_store_duplicated_event_ids,
1860                non_empty_all_duplicates: all_duplicates,
1861            } = filter_duplicate_events(
1862                &self.store,
1863                LinkedChunkId::Room(self.room.as_ref()),
1864                &self.room_linked_chunk,
1865                events,
1866            )
1867            .await?;
1868
1869            // If not all the events have been back-paginated, we need to remove the
1870            // previous ones, otherwise we can end up with misordered events.
1871            //
1872            // Consider the following scenario:
1873            // - sync returns [D, E, F]
1874            // - then sync returns [] with a previous batch token PB1, so the internal
1875            //   linked chunk state is [D, E, F, PB1].
1876            // - back-paginating with PB1 may return [A, B, C, D, E, F].
1877            //
1878            // Only inserting the new events when replacing PB1 would result in a timeline
1879            // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
1880            // all the events, in case this happens (see also #4746).
1881
1882            if !all_duplicates {
1883                // Let's forget all the previous events.
1884                self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1885                    .await?;
1886            } else {
1887                // All new events are duplicated, they can all be ignored.
1888                events.clear();
1889                // The gap can be ditched too, as it won't be useful to backpaginate any
1890                // further.
1891                new_token = None;
1892            }
1893
1894            // `/messages` has been called with `dir=b` (backwards), so the events are in
1895            // the inverted order; reorder them.
1896            let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1897
1898            let new_gap = new_token.map(|prev_token| Gap { prev_token });
1899            let reached_start = self.room_linked_chunk.finish_back_pagination(
1900                prev_gap_id,
1901                new_gap,
1902                &topo_ordered_events,
1903            );
1904
1905            // Note: this flushes updates to the store.
1906            self.post_process_new_events(topo_ordered_events, false).await?;
1907
1908            let event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1909
1910            Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1911        }
1912
1913        /// Subscribe to thread for a given root event, and get a (maybe empty)
1914        /// initially known list of events for that thread.
1915        pub fn subscribe_to_thread(
1916            &mut self,
1917            root: OwnedEventId,
1918        ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1919            self.get_or_reload_thread(root).subscribe()
1920        }
1921
1922        /// Back paginate in the given thread.
1923        ///
1924        /// Will always start from the end, unless we previously paginated.
1925        pub fn finish_thread_network_pagination(
1926            &mut self,
1927            root: OwnedEventId,
1928            prev_token: Option<String>,
1929            new_token: Option<String>,
1930            events: Vec<Event>,
1931        ) -> Option<BackPaginationOutcome> {
1932            self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1933        }
1934
1935        pub fn load_more_thread_events_backwards(
1936            &mut self,
1937            root: OwnedEventId,
1938        ) -> LoadMoreEventsBackwardsOutcome {
1939            self.get_or_reload_thread(root).load_more_events_backwards()
1940        }
1941    }
1942}
1943
1944/// An enum representing where an event has been found.
1945pub(super) enum EventLocation {
1946    /// Event lives in memory (and likely in the store!).
1947    Memory(Position),
1948
1949    /// Event lives in the store only, it has not been loaded in memory yet.
1950    Store,
1951}
1952
1953pub(super) use private::RoomEventCacheState;
1954
1955#[cfg(test)]
1956mod tests {
1957    use matrix_sdk_base::event_cache::Event;
1958    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1959    use ruma::{
1960        event_id,
1961        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1962        room_id, user_id, RoomId,
1963    };
1964
1965    use crate::test_utils::logged_in_client;
1966
1967    #[async_test]
1968    async fn test_find_event_by_id_with_edit_relation() {
1969        let original_id = event_id!("$original");
1970        let related_id = event_id!("$related");
1971        let room_id = room_id!("!galette:saucisse.bzh");
1972        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1973
1974        assert_relations(
1975            room_id,
1976            f.text_msg("Original event").event_id(original_id).into(),
1977            f.text_msg("* An edited event")
1978                .edit(
1979                    original_id,
1980                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1981                )
1982                .event_id(related_id)
1983                .into(),
1984            f,
1985        )
1986        .await;
1987    }
1988
1989    #[async_test]
1990    async fn test_find_event_by_id_with_thread_reply_relation() {
1991        let original_id = event_id!("$original");
1992        let related_id = event_id!("$related");
1993        let room_id = room_id!("!galette:saucisse.bzh");
1994        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1995
1996        assert_relations(
1997            room_id,
1998            f.text_msg("Original event").event_id(original_id).into(),
1999            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2000            f,
2001        )
2002        .await;
2003    }
2004
2005    #[async_test]
2006    async fn test_find_event_by_id_with_reaction_relation() {
2007        let original_id = event_id!("$original");
2008        let related_id = event_id!("$related");
2009        let room_id = room_id!("!galette:saucisse.bzh");
2010        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2011
2012        assert_relations(
2013            room_id,
2014            f.text_msg("Original event").event_id(original_id).into(),
2015            f.reaction(original_id, ":D").event_id(related_id).into(),
2016            f,
2017        )
2018        .await;
2019    }
2020
2021    #[async_test]
2022    async fn test_find_event_by_id_with_poll_response_relation() {
2023        let original_id = event_id!("$original");
2024        let related_id = event_id!("$related");
2025        let room_id = room_id!("!galette:saucisse.bzh");
2026        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2027
2028        assert_relations(
2029            room_id,
2030            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2031                .event_id(original_id)
2032                .into(),
2033            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2034            f,
2035        )
2036        .await;
2037    }
2038
2039    #[async_test]
2040    async fn test_find_event_by_id_with_poll_end_relation() {
2041        let original_id = event_id!("$original");
2042        let related_id = event_id!("$related");
2043        let room_id = room_id!("!galette:saucisse.bzh");
2044        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2045
2046        assert_relations(
2047            room_id,
2048            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2049                .event_id(original_id)
2050                .into(),
2051            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2052            f,
2053        )
2054        .await;
2055    }
2056
2057    #[async_test]
2058    async fn test_find_event_by_id_with_filtered_relationships() {
2059        let original_id = event_id!("$original");
2060        let related_id = event_id!("$related");
2061        let associated_related_id = event_id!("$recursive_related");
2062        let room_id = room_id!("!galette:saucisse.bzh");
2063        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2064
2065        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2066        let related_event = event_factory
2067            .text_msg("* Edited event")
2068            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2069            .event_id(related_id)
2070            .into();
2071        let associated_related_event =
2072            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2073
2074        let client = logged_in_client(None).await;
2075
2076        let event_cache = client.event_cache();
2077        event_cache.subscribe().unwrap();
2078
2079        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2080        let room = client.get_room(room_id).unwrap();
2081
2082        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2083
2084        // Save the original event.
2085        room_event_cache.save_events([original_event]).await;
2086
2087        // Save the related event.
2088        room_event_cache.save_events([related_event]).await;
2089
2090        // Save the associated related event, which redacts the related event.
2091        room_event_cache.save_events([associated_related_event]).await;
2092
2093        let filter = Some(vec![RelationType::Replacement]);
2094        let (event, related_events) =
2095            room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2096        // Fetched event is the right one.
2097        let cached_event_id = event.event_id().unwrap();
2098        assert_eq!(cached_event_id, original_id);
2099
2100        // There's only the edit event (an edit event can't have its own edit event).
2101        assert_eq!(related_events.len(), 1);
2102
2103        let related_event_id = related_events[0].event_id().unwrap();
2104        assert_eq!(related_event_id, related_id);
2105
2106        // Now we'll filter threads instead, there should be no related events
2107        let filter = Some(vec![RelationType::Thread]);
2108        let (event, related_events) =
2109            room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2110        // Fetched event is the right one.
2111        let cached_event_id = event.event_id().unwrap();
2112        assert_eq!(cached_event_id, original_id);
2113        // No Thread related events found
2114        assert!(related_events.is_empty());
2115    }
2116
2117    #[async_test]
2118    async fn test_find_event_by_id_with_recursive_relation() {
2119        let original_id = event_id!("$original");
2120        let related_id = event_id!("$related");
2121        let associated_related_id = event_id!("$recursive_related");
2122        let room_id = room_id!("!galette:saucisse.bzh");
2123        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2124
2125        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2126        let related_event = event_factory
2127            .text_msg("* Edited event")
2128            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2129            .event_id(related_id)
2130            .into();
2131        let associated_related_event =
2132            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2133
2134        let client = logged_in_client(None).await;
2135
2136        let event_cache = client.event_cache();
2137        event_cache.subscribe().unwrap();
2138
2139        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2140        let room = client.get_room(room_id).unwrap();
2141
2142        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2143
2144        // Save the original event.
2145        room_event_cache.save_events([original_event]).await;
2146
2147        // Save the related event.
2148        room_event_cache.save_events([related_event]).await;
2149
2150        // Save the associated related event, which redacts the related event.
2151        room_event_cache.save_events([associated_related_event]).await;
2152
2153        let (event, related_events) =
2154            room_event_cache.find_event_with_relations(original_id, None).await.unwrap();
2155        // Fetched event is the right one.
2156        let cached_event_id = event.event_id().unwrap();
2157        assert_eq!(cached_event_id, original_id);
2158
2159        // There are both the related id and the associatively related id
2160        assert_eq!(related_events.len(), 2);
2161
2162        let related_event_id = related_events[0].event_id().unwrap();
2163        assert_eq!(related_event_id, related_id);
2164        let related_event_id = related_events[1].event_id().unwrap();
2165        assert_eq!(related_event_id, associated_related_id);
2166    }
2167
2168    async fn assert_relations(
2169        room_id: &RoomId,
2170        original_event: Event,
2171        related_event: Event,
2172        event_factory: EventFactory,
2173    ) {
2174        let client = logged_in_client(None).await;
2175
2176        let event_cache = client.event_cache();
2177        event_cache.subscribe().unwrap();
2178
2179        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2180        let room = client.get_room(room_id).unwrap();
2181
2182        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2183
2184        // Save the original event.
2185        let original_event_id = original_event.event_id().unwrap();
2186        room_event_cache.save_events([original_event]).await;
2187
2188        // Save an unrelated event to check it's not in the related events list.
2189        let unrelated_id = event_id!("$2");
2190        room_event_cache
2191            .save_events([event_factory
2192                .text_msg("An unrelated event")
2193                .event_id(unrelated_id)
2194                .into()])
2195            .await;
2196
2197        // Save the related event.
2198        let related_id = related_event.event_id().unwrap();
2199        room_event_cache.save_events([related_event]).await;
2200
2201        let (event, related_events) =
2202            room_event_cache.find_event_with_relations(&original_event_id, None).await.unwrap();
2203        // Fetched event is the right one.
2204        let cached_event_id = event.event_id().unwrap();
2205        assert_eq!(cached_event_id, original_event_id);
2206
2207        // There is only the actually related event in the related ones
2208        let related_event_id = related_events[0].event_id().unwrap();
2209        assert_eq!(related_event_id, related_id);
2210    }
2211}
2212
2213#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
2214mod timed_tests {
2215    use std::sync::Arc;
2216
2217    use assert_matches::assert_matches;
2218    use assert_matches2::assert_let;
2219    use eyeball_im::VectorDiff;
2220    use futures_util::FutureExt;
2221    use matrix_sdk_base::{
2222        event_cache::{
2223            store::{EventCacheStore as _, MemoryStore},
2224            Gap,
2225        },
2226        linked_chunk::{
2227            lazy_loader::from_all_chunks, ChunkContent, ChunkIdentifier, LinkedChunkId, Position,
2228            Update,
2229        },
2230        store::StoreConfig,
2231        sync::{JoinedRoomUpdate, Timeline},
2232    };
2233    use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
2234    use ruma::{
2235        event_id,
2236        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2237        room_id, user_id, OwnedUserId,
2238    };
2239    use tokio::task::yield_now;
2240
2241    use super::RoomEventCacheGenericUpdate;
2242    use crate::{
2243        assert_let_timeout,
2244        event_cache::{room::LoadMoreEventsBackwardsOutcome, RoomEventCacheUpdate},
2245        test_utils::client::MockClientBuilder,
2246    };
2247
2248    #[async_test]
2249    async fn test_write_to_storage() {
2250        let room_id = room_id!("!galette:saucisse.bzh");
2251        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2252
2253        let event_cache_store = Arc::new(MemoryStore::new());
2254
2255        let client = MockClientBuilder::new(None)
2256            .on_builder(|builder| {
2257                builder.store_config(
2258                    StoreConfig::new("hodlor".to_owned())
2259                        .event_cache_store(event_cache_store.clone()),
2260                )
2261            })
2262            .build()
2263            .await;
2264
2265        let event_cache = client.event_cache();
2266
2267        // Don't forget to subscribe and like^W enable storage!
2268        event_cache.subscribe().unwrap();
2269
2270        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2271        let room = client.get_room(room_id).unwrap();
2272
2273        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2274        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2275
2276        // Propagate an update for a message and a prev-batch token.
2277        let timeline = Timeline {
2278            limited: true,
2279            prev_batch: Some("raclette".to_owned()),
2280            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2281        };
2282
2283        room_event_cache
2284            .inner
2285            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2286            .await
2287            .unwrap();
2288
2289        // Just checking the generic update is correct.
2290        assert_matches!(
2291            generic_stream.recv().await,
2292            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2293                assert_eq!(expected_room_id, room_id);
2294            }
2295        );
2296
2297        // Check the storage.
2298        let linked_chunk = from_all_chunks::<3, _, _>(
2299            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2300        )
2301        .unwrap()
2302        .unwrap();
2303
2304        assert_eq!(linked_chunk.chunks().count(), 2);
2305
2306        let mut chunks = linked_chunk.chunks();
2307
2308        // We start with the gap.
2309        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2310            assert_eq!(gap.prev_token, "raclette");
2311        });
2312
2313        // Then we have the stored event.
2314        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2315            assert_eq!(events.len(), 1);
2316            let deserialized = events[0].raw().deserialize().unwrap();
2317            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2318            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2319        });
2320
2321        // That's all, folks!
2322        assert!(chunks.next().is_none());
2323    }
2324
2325    #[async_test]
2326    async fn test_write_to_storage_strips_bundled_relations() {
2327        let room_id = room_id!("!galette:saucisse.bzh");
2328        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2329
2330        let event_cache_store = Arc::new(MemoryStore::new());
2331
2332        let client = MockClientBuilder::new(None)
2333            .on_builder(|builder| {
2334                builder.store_config(
2335                    StoreConfig::new("hodlor".to_owned())
2336                        .event_cache_store(event_cache_store.clone()),
2337                )
2338            })
2339            .build()
2340            .await;
2341
2342        let event_cache = client.event_cache();
2343
2344        // Don't forget to subscribe and like^W enable storage!
2345        event_cache.subscribe().unwrap();
2346
2347        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2348        let room = client.get_room(room_id).unwrap();
2349
2350        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2351        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2352
2353        // Propagate an update for a message with bundled relations.
2354        let ev = f
2355            .text_msg("hey yo")
2356            .sender(*ALICE)
2357            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2358            .into_event();
2359
2360        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2361
2362        room_event_cache
2363            .inner
2364            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2365            .await
2366            .unwrap();
2367
2368        // Just checking the generic update is correct.
2369        assert_matches!(
2370            generic_stream.recv().await,
2371            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2372                assert_eq!(expected_room_id, room_id);
2373            }
2374        );
2375
2376        // The in-memory linked chunk keeps the bundled relation.
2377        {
2378            let events = room_event_cache.events().await;
2379
2380            assert_eq!(events.len(), 1);
2381
2382            let ev = events[0].raw().deserialize().unwrap();
2383            assert_let!(
2384                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2385            );
2386
2387            let original = msg.as_original().unwrap();
2388            assert_eq!(original.content.body(), "hey yo");
2389            assert!(original.unsigned.relations.replace.is_some());
2390        }
2391
2392        // The one in storage does not.
2393        let linked_chunk = from_all_chunks::<3, _, _>(
2394            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2395        )
2396        .unwrap()
2397        .unwrap();
2398
2399        assert_eq!(linked_chunk.chunks().count(), 1);
2400
2401        let mut chunks = linked_chunk.chunks();
2402        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2403            assert_eq!(events.len(), 1);
2404
2405            let ev = events[0].raw().deserialize().unwrap();
2406            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2407
2408            let original = msg.as_original().unwrap();
2409            assert_eq!(original.content.body(), "hey yo");
2410            assert!(original.unsigned.relations.replace.is_none());
2411        });
2412
2413        // That's all, folks!
2414        assert!(chunks.next().is_none());
2415    }
2416
2417    #[async_test]
2418    async fn test_clear() {
2419        let room_id = room_id!("!galette:saucisse.bzh");
2420        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2421
2422        let event_cache_store = Arc::new(MemoryStore::new());
2423
2424        let event_id1 = event_id!("$1");
2425        let event_id2 = event_id!("$2");
2426
2427        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2428        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2429
2430        // Prefill the store with some data.
2431        event_cache_store
2432            .handle_linked_chunk_updates(
2433                LinkedChunkId::Room(room_id),
2434                vec![
2435                    // An empty items chunk.
2436                    Update::NewItemsChunk {
2437                        previous: None,
2438                        new: ChunkIdentifier::new(0),
2439                        next: None,
2440                    },
2441                    // A gap chunk.
2442                    Update::NewGapChunk {
2443                        previous: Some(ChunkIdentifier::new(0)),
2444                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2445                        new: ChunkIdentifier::new(42),
2446                        next: None,
2447                        gap: Gap { prev_token: "comté".to_owned() },
2448                    },
2449                    // Another items chunk, non-empty this time.
2450                    Update::NewItemsChunk {
2451                        previous: Some(ChunkIdentifier::new(42)),
2452                        new: ChunkIdentifier::new(1),
2453                        next: None,
2454                    },
2455                    Update::PushItems {
2456                        at: Position::new(ChunkIdentifier::new(1), 0),
2457                        items: vec![ev1.clone()],
2458                    },
2459                    // And another items chunk, non-empty again.
2460                    Update::NewItemsChunk {
2461                        previous: Some(ChunkIdentifier::new(1)),
2462                        new: ChunkIdentifier::new(2),
2463                        next: None,
2464                    },
2465                    Update::PushItems {
2466                        at: Position::new(ChunkIdentifier::new(2), 0),
2467                        items: vec![ev2.clone()],
2468                    },
2469                ],
2470            )
2471            .await
2472            .unwrap();
2473
2474        let client = MockClientBuilder::new(None)
2475            .on_builder(|builder| {
2476                builder.store_config(
2477                    StoreConfig::new("hodlor".to_owned())
2478                        .event_cache_store(event_cache_store.clone()),
2479                )
2480            })
2481            .build()
2482            .await;
2483
2484        let event_cache = client.event_cache();
2485
2486        // Don't forget to subscribe and like^W enable storage!
2487        event_cache.subscribe().unwrap();
2488
2489        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2490        let room = client.get_room(room_id).unwrap();
2491
2492        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2493
2494        let (items, mut stream) = room_event_cache.subscribe().await;
2495        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2496
2497        // The rooms knows about all cached events.
2498        {
2499            assert!(room_event_cache.find_event(event_id1).await.is_some());
2500            assert!(room_event_cache.find_event(event_id2).await.is_some());
2501        }
2502
2503        // But only part of events are loaded from the store
2504        {
2505            // The room must contain only one event because only one chunk has been loaded.
2506            assert_eq!(items.len(), 1);
2507            assert_eq!(items[0].event_id().unwrap(), event_id2);
2508
2509            assert!(stream.is_empty());
2510        }
2511
2512        // Let's load more chunks to load all events.
2513        {
2514            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2515
2516            assert_let_timeout!(
2517                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2518            );
2519            assert_eq!(diffs.len(), 1);
2520            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2521                // Here you are `event_id1`!
2522                assert_eq!(event.event_id().unwrap(), event_id1);
2523            });
2524
2525            assert!(stream.is_empty());
2526        }
2527
2528        // After clearing,…
2529        room_event_cache.clear().await.unwrap();
2530
2531        //… we get an update that the content has been cleared.
2532        assert_let_timeout!(
2533            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2534        );
2535        assert_eq!(diffs.len(), 1);
2536        assert_let!(VectorDiff::Clear = &diffs[0]);
2537
2538        // … same with a generic update.
2539        assert_let_timeout!(
2540            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2541        );
2542        assert_eq!(received_room_id, room_id);
2543
2544        // Events individually are not forgotten by the event cache, after clearing a
2545        // room.
2546        assert!(room_event_cache.find_event(event_id1).await.is_some());
2547
2548        // But their presence in a linked chunk is forgotten.
2549        let items = room_event_cache.events().await;
2550        assert!(items.is_empty());
2551
2552        // The event cache store too.
2553        let linked_chunk = from_all_chunks::<3, _, _>(
2554            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2555        )
2556        .unwrap()
2557        .unwrap();
2558
2559        // Note: while the event cache store could return `None` here, clearing it will
2560        // reset it to its initial form, maintaining the invariant that it
2561        // contains a single items chunk that's empty.
2562        assert_eq!(linked_chunk.num_items(), 0);
2563    }
2564
2565    #[async_test]
2566    async fn test_load_from_storage() {
2567        let room_id = room_id!("!galette:saucisse.bzh");
2568        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2569
2570        let event_cache_store = Arc::new(MemoryStore::new());
2571
2572        let event_id1 = event_id!("$1");
2573        let event_id2 = event_id!("$2");
2574
2575        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2576        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2577
2578        // Prefill the store with some data.
2579        event_cache_store
2580            .handle_linked_chunk_updates(
2581                LinkedChunkId::Room(room_id),
2582                vec![
2583                    // An empty items chunk.
2584                    Update::NewItemsChunk {
2585                        previous: None,
2586                        new: ChunkIdentifier::new(0),
2587                        next: None,
2588                    },
2589                    // A gap chunk.
2590                    Update::NewGapChunk {
2591                        previous: Some(ChunkIdentifier::new(0)),
2592                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2593                        new: ChunkIdentifier::new(42),
2594                        next: None,
2595                        gap: Gap { prev_token: "cheddar".to_owned() },
2596                    },
2597                    // Another items chunk, non-empty this time.
2598                    Update::NewItemsChunk {
2599                        previous: Some(ChunkIdentifier::new(42)),
2600                        new: ChunkIdentifier::new(1),
2601                        next: None,
2602                    },
2603                    Update::PushItems {
2604                        at: Position::new(ChunkIdentifier::new(1), 0),
2605                        items: vec![ev1.clone()],
2606                    },
2607                    // And another items chunk, non-empty again.
2608                    Update::NewItemsChunk {
2609                        previous: Some(ChunkIdentifier::new(1)),
2610                        new: ChunkIdentifier::new(2),
2611                        next: None,
2612                    },
2613                    Update::PushItems {
2614                        at: Position::new(ChunkIdentifier::new(2), 0),
2615                        items: vec![ev2.clone()],
2616                    },
2617                ],
2618            )
2619            .await
2620            .unwrap();
2621
2622        let client = MockClientBuilder::new(None)
2623            .on_builder(|builder| {
2624                builder.store_config(
2625                    StoreConfig::new("hodlor".to_owned())
2626                        .event_cache_store(event_cache_store.clone()),
2627                )
2628            })
2629            .build()
2630            .await;
2631
2632        let event_cache = client.event_cache();
2633
2634        // Don't forget to subscribe and like^W enable storage!
2635        event_cache.subscribe().unwrap();
2636
2637        // Let's check whether the generic updates are received for the initialisation.
2638        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2639
2640        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2641        let room = client.get_room(room_id).unwrap();
2642
2643        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2644
2645        // The room event cache has been loaded. A generic update must have been
2646        // triggered.
2647        assert_matches!(
2648            generic_stream.recv().await,
2649            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2650                assert_eq!(room_id, expected_room_id);
2651            }
2652        );
2653
2654        let (items, mut stream) = room_event_cache.subscribe().await;
2655
2656        // The initial items contain one event because only the last chunk is loaded by
2657        // default.
2658        assert_eq!(items.len(), 1);
2659        assert_eq!(items[0].event_id().unwrap(), event_id2);
2660        assert!(stream.is_empty());
2661
2662        // The event cache knows only all events though, even if they aren't loaded.
2663        assert!(room_event_cache.find_event(event_id1).await.is_some());
2664        assert!(room_event_cache.find_event(event_id2).await.is_some());
2665
2666        // Let's paginate to load more events.
2667        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2668
2669        assert_let_timeout!(
2670            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2671        );
2672        assert_eq!(diffs.len(), 1);
2673        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2674            assert_eq!(event.event_id().unwrap(), event_id1);
2675        });
2676
2677        assert!(stream.is_empty());
2678
2679        // A generic update is triggered too.
2680        assert_matches!(
2681            generic_stream.recv().await,
2682            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2683                assert_eq!(expected_room_id, room_id);
2684            }
2685        );
2686
2687        // A new update with one of these events leads to deduplication.
2688        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2689
2690        room_event_cache
2691            .inner
2692            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2693            .await
2694            .unwrap();
2695
2696        // Just checking the generic update is correct. There is a duplicate event, so
2697        // no generic changes whatsoever!
2698        assert!(generic_stream.recv().now_or_never().is_none());
2699
2700        // The stream doesn't report these changes *yet*. Use the items vector given
2701        // when subscribing, to check that the items correspond to their new
2702        // positions. The duplicated item is removed (so it's not the first
2703        // element anymore), and it's added to the back of the list.
2704        let items = room_event_cache.events().await;
2705        assert_eq!(items.len(), 2);
2706        assert_eq!(items[0].event_id().unwrap(), event_id1);
2707        assert_eq!(items[1].event_id().unwrap(), event_id2);
2708    }
2709
2710    #[async_test]
2711    async fn test_load_from_storage_resilient_to_failure() {
2712        let room_id = room_id!("!fondue:patate.ch");
2713        let event_cache_store = Arc::new(MemoryStore::new());
2714
2715        let event = EventFactory::new()
2716            .room(room_id)
2717            .sender(user_id!("@ben:saucisse.bzh"))
2718            .text_msg("foo")
2719            .event_id(event_id!("$42"))
2720            .into_event();
2721
2722        // Prefill the store with invalid data: two chunks that form a cycle.
2723        event_cache_store
2724            .handle_linked_chunk_updates(
2725                LinkedChunkId::Room(room_id),
2726                vec![
2727                    Update::NewItemsChunk {
2728                        previous: None,
2729                        new: ChunkIdentifier::new(0),
2730                        next: None,
2731                    },
2732                    Update::PushItems {
2733                        at: Position::new(ChunkIdentifier::new(0), 0),
2734                        items: vec![event],
2735                    },
2736                    Update::NewItemsChunk {
2737                        previous: Some(ChunkIdentifier::new(0)),
2738                        new: ChunkIdentifier::new(1),
2739                        next: Some(ChunkIdentifier::new(0)),
2740                    },
2741                ],
2742            )
2743            .await
2744            .unwrap();
2745
2746        let client = MockClientBuilder::new(None)
2747            .on_builder(|builder| {
2748                builder.store_config(
2749                    StoreConfig::new("holder".to_owned())
2750                        .event_cache_store(event_cache_store.clone()),
2751                )
2752            })
2753            .build()
2754            .await;
2755
2756        let event_cache = client.event_cache();
2757
2758        // Don't forget to subscribe and like^W enable storage!
2759        event_cache.subscribe().unwrap();
2760
2761        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2762        let room = client.get_room(room_id).unwrap();
2763
2764        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2765
2766        let items = room_event_cache.events().await;
2767
2768        // Because the persisted content was invalid, the room store is reset: there are
2769        // no events in the cache.
2770        assert!(items.is_empty());
2771
2772        // Storage doesn't contain anything. It would also be valid that it contains a
2773        // single initial empty items chunk.
2774        let raw_chunks =
2775            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
2776        assert!(raw_chunks.is_empty());
2777    }
2778
2779    #[async_test]
2780    async fn test_no_useless_gaps() {
2781        let room_id = room_id!("!galette:saucisse.bzh");
2782
2783        let client = MockClientBuilder::new(None).build().await;
2784
2785        let event_cache = client.event_cache();
2786        event_cache.subscribe().unwrap();
2787
2788        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2789        let room = client.get_room(room_id).unwrap();
2790        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2791        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2792
2793        let f = EventFactory::new().room(room_id).sender(*ALICE);
2794
2795        // Propagate an update including a limited timeline with one message and a
2796        // prev-batch token.
2797        room_event_cache
2798            .inner
2799            .handle_joined_room_update(JoinedRoomUpdate {
2800                timeline: Timeline {
2801                    limited: true,
2802                    prev_batch: Some("raclette".to_owned()),
2803                    events: vec![f.text_msg("hey yo").into_event()],
2804                },
2805                ..Default::default()
2806            })
2807            .await
2808            .unwrap();
2809
2810        // Just checking the generic update is correct.
2811        assert_matches!(
2812            generic_stream.recv().await,
2813            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2814                assert_eq!(expected_room_id, room_id);
2815            }
2816        );
2817
2818        {
2819            let mut state = room_event_cache.inner.state.write().await;
2820
2821            let mut num_gaps = 0;
2822            let mut num_events = 0;
2823
2824            for c in state.room_linked_chunk().chunks() {
2825                match c.content() {
2826                    ChunkContent::Items(items) => num_events += items.len(),
2827                    ChunkContent::Gap(_) => num_gaps += 1,
2828                }
2829            }
2830
2831            // The limited sync unloads the chunk, so it will appear as if there are only
2832            // the events.
2833            assert_eq!(num_gaps, 0);
2834            assert_eq!(num_events, 1);
2835
2836            // But if I manually reload more of the chunk, the gap will be present.
2837            assert_matches!(
2838                state.load_more_events_backwards().await.unwrap(),
2839                LoadMoreEventsBackwardsOutcome::Gap { .. }
2840            );
2841
2842            num_gaps = 0;
2843            num_events = 0;
2844            for c in state.room_linked_chunk().chunks() {
2845                match c.content() {
2846                    ChunkContent::Items(items) => num_events += items.len(),
2847                    ChunkContent::Gap(_) => num_gaps += 1,
2848                }
2849            }
2850
2851            // The gap must have been stored.
2852            assert_eq!(num_gaps, 1);
2853            assert_eq!(num_events, 1);
2854        }
2855
2856        // Now, propagate an update for another message, but the timeline isn't limited
2857        // this time.
2858        room_event_cache
2859            .inner
2860            .handle_joined_room_update(JoinedRoomUpdate {
2861                timeline: Timeline {
2862                    limited: false,
2863                    prev_batch: Some("fondue".to_owned()),
2864                    events: vec![f.text_msg("sup").into_event()],
2865                },
2866                ..Default::default()
2867            })
2868            .await
2869            .unwrap();
2870
2871        // Just checking the generic update is correct.
2872        assert_matches!(
2873            generic_stream.recv().await,
2874            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2875                assert_eq!(expected_room_id, room_id);
2876            }
2877        );
2878
2879        {
2880            let state = room_event_cache.inner.state.read().await;
2881
2882            let mut num_gaps = 0;
2883            let mut num_events = 0;
2884
2885            for c in state.room_linked_chunk().chunks() {
2886                match c.content() {
2887                    ChunkContent::Items(items) => num_events += items.len(),
2888                    ChunkContent::Gap(gap) => {
2889                        assert_eq!(gap.prev_token, "raclette");
2890                        num_gaps += 1;
2891                    }
2892                }
2893            }
2894
2895            // There's only the previous gap, no new ones.
2896            assert_eq!(num_gaps, 1);
2897            assert_eq!(num_events, 2);
2898        }
2899    }
2900
2901    #[async_test]
2902    async fn test_shrink_to_last_chunk() {
2903        let room_id = room_id!("!galette:saucisse.bzh");
2904
2905        let client = MockClientBuilder::new(None).build().await;
2906
2907        let f = EventFactory::new().room(room_id);
2908
2909        let evid1 = event_id!("$1");
2910        let evid2 = event_id!("$2");
2911
2912        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2913        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2914
2915        // Fill the event cache store with an initial linked chunk with 2 events chunks.
2916        {
2917            let store = client.event_cache_store();
2918            let store = store.lock().await.unwrap();
2919            store
2920                .handle_linked_chunk_updates(
2921                    LinkedChunkId::Room(room_id),
2922                    vec![
2923                        Update::NewItemsChunk {
2924                            previous: None,
2925                            new: ChunkIdentifier::new(0),
2926                            next: None,
2927                        },
2928                        Update::PushItems {
2929                            at: Position::new(ChunkIdentifier::new(0), 0),
2930                            items: vec![ev1],
2931                        },
2932                        Update::NewItemsChunk {
2933                            previous: Some(ChunkIdentifier::new(0)),
2934                            new: ChunkIdentifier::new(1),
2935                            next: None,
2936                        },
2937                        Update::PushItems {
2938                            at: Position::new(ChunkIdentifier::new(1), 0),
2939                            items: vec![ev2],
2940                        },
2941                    ],
2942                )
2943                .await
2944                .unwrap();
2945        }
2946
2947        let event_cache = client.event_cache();
2948        event_cache.subscribe().unwrap();
2949
2950        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2951        let room = client.get_room(room_id).unwrap();
2952        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2953
2954        // Sanity check: lazily loaded, so only includes one item at start.
2955        let (events, mut stream) = room_event_cache.subscribe().await;
2956        assert_eq!(events.len(), 1);
2957        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2958        assert!(stream.is_empty());
2959
2960        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2961
2962        // Force loading the full linked chunk by back-paginating.
2963        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2964        assert_eq!(outcome.events.len(), 1);
2965        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2966        assert!(outcome.reached_start);
2967
2968        // We also get an update about the loading from the store.
2969        assert_let_timeout!(
2970            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2971        );
2972        assert_eq!(diffs.len(), 1);
2973        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2974            assert_eq!(value.event_id().as_deref(), Some(evid1));
2975        });
2976
2977        assert!(stream.is_empty());
2978
2979        // Same for the generic update.
2980        assert_let_timeout!(
2981            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2982        );
2983        assert_eq!(received_room_id, room_id);
2984
2985        // Shrink the linked chunk to the last chunk.
2986        let diffs = room_event_cache
2987            .inner
2988            .state
2989            .write()
2990            .await
2991            .force_shrink_to_last_chunk()
2992            .await
2993            .expect("shrinking should succeed");
2994
2995        // We receive updates about the changes to the linked chunk.
2996        assert_eq!(diffs.len(), 2);
2997        assert_matches!(&diffs[0], VectorDiff::Clear);
2998        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
2999            assert_eq!(values.len(), 1);
3000            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3001        });
3002
3003        assert!(stream.is_empty());
3004
3005        // No generic update is sent in this case.
3006        assert!(generic_stream.is_empty());
3007
3008        // When reading the events, we do get only the last one.
3009        let events = room_event_cache.events().await;
3010        assert_eq!(events.len(), 1);
3011        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3012
3013        // But if we back-paginate, we don't need access to network to find out about
3014        // the previous event.
3015        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3016        assert_eq!(outcome.events.len(), 1);
3017        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3018        assert!(outcome.reached_start);
3019    }
3020
3021    #[async_test]
3022    async fn test_room_ordering() {
3023        let room_id = room_id!("!galette:saucisse.bzh");
3024
3025        let client = MockClientBuilder::new(None).build().await;
3026
3027        let f = EventFactory::new().room(room_id).sender(*ALICE);
3028
3029        let evid1 = event_id!("$1");
3030        let evid2 = event_id!("$2");
3031        let evid3 = event_id!("$3");
3032
3033        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3034        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3035        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3036
3037        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3038        {
3039            let store = client.event_cache_store();
3040            let store = store.lock().await.unwrap();
3041            store
3042                .handle_linked_chunk_updates(
3043                    LinkedChunkId::Room(room_id),
3044                    vec![
3045                        Update::NewItemsChunk {
3046                            previous: None,
3047                            new: ChunkIdentifier::new(0),
3048                            next: None,
3049                        },
3050                        Update::PushItems {
3051                            at: Position::new(ChunkIdentifier::new(0), 0),
3052                            items: vec![ev1, ev2],
3053                        },
3054                        Update::NewItemsChunk {
3055                            previous: Some(ChunkIdentifier::new(0)),
3056                            new: ChunkIdentifier::new(1),
3057                            next: None,
3058                        },
3059                        Update::PushItems {
3060                            at: Position::new(ChunkIdentifier::new(1), 0),
3061                            items: vec![ev3.clone()],
3062                        },
3063                    ],
3064                )
3065                .await
3066                .unwrap();
3067        }
3068
3069        let event_cache = client.event_cache();
3070        event_cache.subscribe().unwrap();
3071
3072        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3073        let room = client.get_room(room_id).unwrap();
3074        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3075
3076        // Initially, the linked chunk only contains the last chunk, so only ev3 is
3077        // loaded.
3078        {
3079            let state = room_event_cache.inner.state.read().await;
3080
3081            // But we can get the order of ev1.
3082            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3083
3084            // And that of ev2 as well.
3085            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3086
3087            // ev3, which is loaded, also has a known ordering.
3088            let mut events = state.room_linked_chunk().events();
3089            let (pos, ev) = events.next().unwrap();
3090            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3091            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3092            assert_eq!(state.room_event_order(pos), Some(2));
3093
3094            // No other loaded events.
3095            assert!(events.next().is_none());
3096        }
3097
3098        // Force loading the full linked chunk by back-paginating.
3099        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3100        assert!(outcome.reached_start);
3101
3102        // All events are now loaded, so their order is precisely their enumerated index
3103        // in a linear iteration.
3104        {
3105            let state = room_event_cache.inner.state.read().await;
3106            for (i, (pos, _)) in state.room_linked_chunk().events().enumerate() {
3107                assert_eq!(state.room_event_order(pos), Some(i));
3108            }
3109        }
3110
3111        // Handle a gappy sync with two events (including one duplicate, so
3112        // deduplication kicks in), so that the linked chunk is shrunk to the
3113        // last chunk, and that the linked chunk only contains the last two
3114        // events.
3115        let evid4 = event_id!("$4");
3116        room_event_cache
3117            .inner
3118            .handle_joined_room_update(JoinedRoomUpdate {
3119                timeline: Timeline {
3120                    limited: true,
3121                    prev_batch: Some("fondue".to_owned()),
3122                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3123                },
3124                ..Default::default()
3125            })
3126            .await
3127            .unwrap();
3128
3129        {
3130            let state = room_event_cache.inner.state.read().await;
3131
3132            // After the shrink, only evid3 and evid4 are loaded.
3133            let mut events = state.room_linked_chunk().events();
3134
3135            let (pos, ev) = events.next().unwrap();
3136            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3137            assert_eq!(state.room_event_order(pos), Some(2));
3138
3139            let (pos, ev) = events.next().unwrap();
3140            assert_eq!(ev.event_id().as_deref(), Some(evid4));
3141            assert_eq!(state.room_event_order(pos), Some(3));
3142
3143            // No other loaded events.
3144            assert!(events.next().is_none());
3145
3146            // But we can still get the order of previous events.
3147            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3148            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3149
3150            // ev3 doesn't have an order with its previous position, since it's been
3151            // deduplicated.
3152            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(1), 0)), None);
3153        }
3154    }
3155
3156    #[async_test]
3157    async fn test_auto_shrink_after_all_subscribers_are_gone() {
3158        let room_id = room_id!("!galette:saucisse.bzh");
3159
3160        let client = MockClientBuilder::new(None).build().await;
3161
3162        let f = EventFactory::new().room(room_id);
3163
3164        let evid1 = event_id!("$1");
3165        let evid2 = event_id!("$2");
3166
3167        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3168        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3169
3170        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3171        {
3172            let store = client.event_cache_store();
3173            let store = store.lock().await.unwrap();
3174            store
3175                .handle_linked_chunk_updates(
3176                    LinkedChunkId::Room(room_id),
3177                    vec![
3178                        Update::NewItemsChunk {
3179                            previous: None,
3180                            new: ChunkIdentifier::new(0),
3181                            next: None,
3182                        },
3183                        Update::PushItems {
3184                            at: Position::new(ChunkIdentifier::new(0), 0),
3185                            items: vec![ev1],
3186                        },
3187                        Update::NewItemsChunk {
3188                            previous: Some(ChunkIdentifier::new(0)),
3189                            new: ChunkIdentifier::new(1),
3190                            next: None,
3191                        },
3192                        Update::PushItems {
3193                            at: Position::new(ChunkIdentifier::new(1), 0),
3194                            items: vec![ev2],
3195                        },
3196                    ],
3197                )
3198                .await
3199                .unwrap();
3200        }
3201
3202        let event_cache = client.event_cache();
3203        event_cache.subscribe().unwrap();
3204
3205        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3206        let room = client.get_room(room_id).unwrap();
3207        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3208
3209        // Sanity check: lazily loaded, so only includes one item at start.
3210        let (events1, mut stream1) = room_event_cache.subscribe().await;
3211        assert_eq!(events1.len(), 1);
3212        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3213        assert!(stream1.is_empty());
3214
3215        // Force loading the full linked chunk by back-paginating.
3216        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3217        assert_eq!(outcome.events.len(), 1);
3218        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3219        assert!(outcome.reached_start);
3220
3221        // We also get an update about the loading from the store. Ignore it, for this
3222        // test's sake.
3223        assert_let_timeout!(
3224            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3225        );
3226        assert_eq!(diffs.len(), 1);
3227        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3228            assert_eq!(value.event_id().as_deref(), Some(evid1));
3229        });
3230
3231        assert!(stream1.is_empty());
3232
3233        // Have another subscriber.
3234        // Since it's not the first one, and the previous one loaded some more events,
3235        // the second subscribers sees them all.
3236        let (events2, stream2) = room_event_cache.subscribe().await;
3237        assert_eq!(events2.len(), 2);
3238        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3239        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3240        assert!(stream2.is_empty());
3241
3242        // Drop the first stream, and wait a bit.
3243        drop(stream1);
3244        yield_now().await;
3245
3246        // The second stream remains undisturbed.
3247        assert!(stream2.is_empty());
3248
3249        // Now drop the second stream, and wait a bit.
3250        drop(stream2);
3251        yield_now().await;
3252
3253        // The linked chunk must have auto-shrunk by now.
3254
3255        {
3256            // Check the inner state: there's no more shared auto-shrinker.
3257            let state = room_event_cache.inner.state.read().await;
3258            assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0);
3259        }
3260
3261        // Getting the events will only give us the latest chunk.
3262        let events3 = room_event_cache.events().await;
3263        assert_eq!(events3.len(), 1);
3264        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3265    }
3266
3267    #[async_test]
3268    async fn test_rfind_map_event_in_memory_by() {
3269        let user_id = user_id!("@mnt_io:matrix.org");
3270        let room_id = room_id!("!raclette:patate.ch");
3271        let client = MockClientBuilder::new(None).build().await;
3272
3273        let event_factory = EventFactory::new().room(room_id);
3274
3275        let event_id_0 = event_id!("$ev0");
3276        let event_id_1 = event_id!("$ev1");
3277        let event_id_2 = event_id!("$ev2");
3278        let event_id_3 = event_id!("$ev3");
3279
3280        let event_0 =
3281            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3282        let event_1 =
3283            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3284        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3285        let event_3 =
3286            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3287
3288        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
3289        // events.
3290        {
3291            let store = client.event_cache_store();
3292            let store = store.lock().await.unwrap();
3293            store
3294                .handle_linked_chunk_updates(
3295                    LinkedChunkId::Room(room_id),
3296                    vec![
3297                        Update::NewItemsChunk {
3298                            previous: None,
3299                            new: ChunkIdentifier::new(0),
3300                            next: None,
3301                        },
3302                        Update::PushItems {
3303                            at: Position::new(ChunkIdentifier::new(0), 0),
3304                            items: vec![event_3],
3305                        },
3306                        Update::NewItemsChunk {
3307                            previous: Some(ChunkIdentifier::new(0)),
3308                            new: ChunkIdentifier::new(1),
3309                            next: None,
3310                        },
3311                        Update::PushItems {
3312                            at: Position::new(ChunkIdentifier::new(1), 0),
3313                            items: vec![event_0, event_1, event_2],
3314                        },
3315                    ],
3316                )
3317                .await
3318                .unwrap();
3319        }
3320
3321        let event_cache = client.event_cache();
3322        event_cache.subscribe().unwrap();
3323
3324        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3325        let room = client.get_room(room_id).unwrap();
3326        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3327
3328        // Look for an event from `BOB`: it must be `event_0`.
3329        assert_matches!(
3330            room_event_cache
3331                .rfind_map_event_in_memory_by(|event| {
3332                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
3333                })
3334                .await,
3335            Some(event_id) => {
3336                assert_eq!(event_id.as_deref(), Some(event_id_0));
3337            }
3338        );
3339
3340        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
3341        // because events are looked for in reverse order.
3342        assert_matches!(
3343            room_event_cache
3344                .rfind_map_event_in_memory_by(|event| {
3345                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
3346                })
3347                .await,
3348            Some(event_id) => {
3349                assert_eq!(event_id.as_deref(), Some(event_id_2));
3350            }
3351        );
3352
3353        // Look for an event that is inside the storage, but not loaded.
3354        assert!(room_event_cache
3355            .rfind_map_event_in_memory_by(|event| {
3356                (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3357                    == Some(user_id))
3358                .then(|| event.event_id())
3359            })
3360            .await
3361            .is_none());
3362
3363        // Look for an event that doesn't exist.
3364        assert!(room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.is_none());
3365    }
3366}