matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        Arc,
23        atomic::{AtomicUsize, Ordering},
24    },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::AmbiguityChange,
32    event_cache::Event,
33    linked_chunk::Position,
34    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37    EventId, OwnedEventId, OwnedRoomId,
38    api::Direction,
39    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40    serde::Raw,
41};
42use tokio::sync::{
43    Notify, RwLock,
44    broadcast::{Receiver, Sender},
45    mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50    AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51    RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54    client::WeakClient,
55    event_cache::EventCacheError,
56    room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64/// A subset of an event cache, for a room.
65///
66/// Cloning is shallow, and thus is cheap to do.
67#[derive(Clone)]
68pub struct RoomEventCache {
69    pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_struct("RoomEventCache").finish_non_exhaustive()
75    }
76}
77
78/// Thin wrapper for a room event cache subscriber, so as to trigger
79/// side-effects when all subscribers are gone.
80///
81/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
82/// more subscribers are active. This is an optimisation to reduce the number of
83/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
84/// active, all data are reduced to the minimum.
85///
86/// The side-effect takes effect on `Drop`.
87#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89    /// Underlying receiver of the room event cache's updates.
90    recv: Receiver<RoomEventCacheUpdate>,
91
92    /// To which room are we listening?
93    room_id: OwnedRoomId,
94
95    /// Sender to the auto-shrink channel.
96    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98    /// Shared instance of the auto-shrinker.
99    subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103    fn drop(&mut self) {
104        let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106        trace!(
107            "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108        );
109
110        if previous_subscriber_count == 1 {
111            // We were the last instance of the subscriber; let the auto-shrinker know by
112            // notifying it of our room id.
113
114            let mut room_id = self.room_id.clone();
115
116            // Try to send without waiting for channel capacity, and restart in a spin-loop
117            // if it failed (until a maximum number of attempts is reached, or
118            // the send was successful). The channel shouldn't be super busy in
119            // general, so this should resolve quickly enough.
120
121            let mut num_attempts = 0;
122
123            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124                num_attempts += 1;
125
126                if num_attempts > 1024 {
127                    // If we've tried too many times, just give up with a warning; after all, this
128                    // is only an optimization.
129                    warn!(
130                        "couldn't send notification to the auto-shrink channel \
131                         after 1024 attempts; giving up"
132                    );
133                    return;
134                }
135
136                match err {
137                    mpsc::error::TrySendError::Full(stolen_room_id) => {
138                        room_id = stolen_room_id;
139                    }
140                    mpsc::error::TrySendError::Closed(_) => return,
141                }
142            }
143
144            trace!("sent notification to the parent channel that we were the last subscriber");
145        }
146    }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150    type Target = Receiver<RoomEventCacheUpdate>;
151
152    fn deref(&self) -> &Self::Target {
153        &self.recv
154    }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158    fn deref_mut(&mut self) -> &mut Self::Target {
159        &mut self.recv
160    }
161}
162
163impl RoomEventCache {
164    /// Create a new [`RoomEventCache`] using the given room and store.
165    pub(super) fn new(
166        client: WeakClient,
167        state: RoomEventCacheState,
168        pagination_status: SharedObservable<RoomPaginationStatus>,
169        room_id: OwnedRoomId,
170        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
172    ) -> Self {
173        Self {
174            inner: Arc::new(RoomEventCacheInner::new(
175                client,
176                state,
177                pagination_status,
178                room_id,
179                auto_shrink_sender,
180                generic_update_sender,
181            )),
182        }
183    }
184
185    /// Read all current events.
186    ///
187    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
188    /// subscriber.
189    pub async fn events(&self) -> Vec<Event> {
190        let state = self.inner.state.read().await;
191
192        state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect()
193    }
194
195    /// Subscribe to this room updates, after getting the initial list of
196    /// events.
197    ///
198    /// Use [`RoomEventCache::events`] to get all current events without the
199    /// subscriber. Creating, and especially dropping, a
200    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
201    pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheSubscriber) {
202        let state = self.inner.state.read().await;
203        let events =
204            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
205
206        let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst);
207        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
208
209        let recv = self.inner.sender.subscribe();
210        let subscriber = RoomEventCacheSubscriber {
211            recv,
212            room_id: self.inner.room_id.clone(),
213            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
214            subscriber_count: state.subscriber_count.clone(),
215        };
216
217        (events, subscriber)
218    }
219
220    /// Subscribe to thread for a given root event, and get a (maybe empty)
221    /// initially known list of events for that thread.
222    pub async fn subscribe_to_thread(
223        &self,
224        thread_root: OwnedEventId,
225    ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
226        let mut state = self.inner.state.write().await;
227        state.subscribe_to_thread(thread_root)
228    }
229
230    /// Paginate backwards in a thread, given its root event ID.
231    ///
232    /// Returns whether we've hit the start of the thread, in which case the
233    /// root event will be prepended to the thread.
234    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
235    pub async fn paginate_thread_backwards(
236        &self,
237        thread_root: OwnedEventId,
238        num_events: u16,
239    ) -> Result<bool> {
240        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
241
242        // Take the lock only for a short time here.
243        let mut outcome =
244            self.inner.state.write().await.load_more_thread_events_backwards(thread_root.clone());
245
246        loop {
247            match outcome {
248                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
249                    // Start a threaded pagination from this gap.
250                    let options = RelationsOptions {
251                        from: prev_token.clone(),
252                        dir: Direction::Backward,
253                        limit: Some(num_events.into()),
254                        include_relations: IncludeRelations::AllRelations,
255                        recurse: true,
256                    };
257
258                    let mut result = room
259                        .relations(thread_root.clone(), options)
260                        .await
261                        .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
262
263                    let reached_start = result.next_batch_token.is_none();
264                    trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
265
266                    // Because the state lock is taken again in `load_or_fetch_event`, we need
267                    // to do this *before* we take the state lock again.
268                    if reached_start {
269                        // Prepend the thread root event to the results.
270                        let root_event = room
271                            .load_or_fetch_event(&thread_root, None)
272                            .await
273                            .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
274
275                        // Note: the events are still in the reversed order at this point, so
276                        // pushing will eventually make it so that the root event is the first.
277                        result.chunk.push(root_event);
278                    }
279
280                    let mut state = self.inner.state.write().await;
281
282                    if let Some(outcome) = state.finish_thread_network_pagination(
283                        thread_root.clone(),
284                        prev_token,
285                        result.next_batch_token,
286                        result.chunk,
287                    ) {
288                        return Ok(outcome.reached_start);
289                    }
290
291                    // fallthrough: restart the pagination.
292                    outcome = state.load_more_thread_events_backwards(thread_root.clone());
293                }
294
295                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
296                    // We're done!
297                    return Ok(true);
298                }
299
300                LoadMoreEventsBackwardsOutcome::Events { .. } => {
301                    // TODO: implement :)
302                    unimplemented!("loading from disk for threads is not implemented yet");
303                }
304
305                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
306                    unreachable!("unused for threads")
307                }
308            }
309        }
310    }
311
312    /// Return a [`RoomPagination`] API object useful for running
313    /// back-pagination queries in the current room.
314    pub fn pagination(&self) -> RoomPagination {
315        RoomPagination { inner: self.inner.clone() }
316    }
317
318    /// Try to find a single event in this room, starting from the most recent
319    /// event.
320    ///
321    /// **Warning**! It looks into the loaded events from the in-memory linked
322    /// chunk **only**. It doesn't look inside the storage.
323    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Option<O>
324    where
325        P: FnMut(&Event) -> Option<O>,
326    {
327        self.inner.state.read().await.rfind_map_event_in_memory_by(predicate)
328    }
329
330    /// Try to find an event by ID in this room.
331    ///
332    /// It starts by looking into loaded events before looking inside the
333    /// storage.
334    pub async fn find_event(&self, event_id: &EventId) -> Option<Event> {
335        self.inner
336            .state
337            .read()
338            .await
339            .find_event(event_id)
340            .await
341            .ok()
342            .flatten()
343            .map(|(_loc, event)| event)
344    }
345
346    /// Try to find an event by ID in this room, along with its related events.
347    ///
348    /// You can filter which types of related events to retrieve using
349    /// `filter`. `None` will retrieve related events of any type.
350    ///
351    /// The related events are sorted like this:
352    ///
353    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
354    ///   located at the beginning of the array.
355    /// - events present in the linked chunk (be it in memory or in the storage)
356    ///   will be sorted according to their ordering in the linked chunk.
357    pub async fn find_event_with_relations(
358        &self,
359        event_id: &EventId,
360        filter: Option<Vec<RelationType>>,
361    ) -> Option<(Event, Vec<Event>)> {
362        // Search in all loaded or stored events.
363        self.inner
364            .state
365            .read()
366            .await
367            .find_event_with_relations(event_id, filter.clone())
368            .await
369            .ok()
370            .flatten()
371    }
372
373    /// Clear all the storage for this [`RoomEventCache`].
374    ///
375    /// This will get rid of all the events from the linked chunk and persisted
376    /// storage.
377    pub async fn clear(&self) -> Result<()> {
378        // Clear the linked chunk and persisted storage.
379        let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
380
381        // Notify observers about the update.
382        let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
383            diffs: updates_as_vector_diffs,
384            origin: EventsOrigin::Cache,
385        });
386
387        // Notify observers about the generic update.
388        let _ = self
389            .inner
390            .generic_update_sender
391            .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
392
393        Ok(())
394    }
395
396    /// Save some events in the event cache, for further retrieval with
397    /// [`Self::event`].
398    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
399        if let Err(err) = self.inner.state.write().await.save_event(events).await {
400            warn!("couldn't save event in the event cache: {err}");
401        }
402    }
403
404    /// Return a nice debug string (a vector of lines) for the linked chunk of
405    /// events for this room.
406    pub async fn debug_string(&self) -> Vec<String> {
407        self.inner.state.read().await.room_linked_chunk().debug_string()
408    }
409}
410
411/// The (non-cloneable) details of the `RoomEventCache`.
412pub(super) struct RoomEventCacheInner {
413    /// The room id for this room.
414    pub(super) room_id: OwnedRoomId,
415
416    pub weak_room: WeakRoom,
417
418    /// Sender part for subscribers to this room.
419    pub sender: Sender<RoomEventCacheUpdate>,
420
421    /// State for this room's event cache.
422    pub state: RwLock<RoomEventCacheState>,
423
424    /// A notifier that we received a new pagination token.
425    pub pagination_batch_token_notifier: Notify,
426
427    pub pagination_status: SharedObservable<RoomPaginationStatus>,
428
429    /// Sender to the auto-shrink channel.
430    ///
431    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
432    /// more details.
433    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
434
435    /// A clone of [`EventCacheInner::generic_update_sender`].
436    ///
437    /// Whilst `EventCacheInner` handles the generic updates from the sync, or
438    /// the storage, it doesn't handle the update from pagination. Having a
439    /// clone here allows to access it from [`RoomPagination`].
440    pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
441}
442
443impl RoomEventCacheInner {
444    /// Creates a new cache for a room, and subscribes to room updates, so as
445    /// to handle new timeline events.
446    fn new(
447        client: WeakClient,
448        state: RoomEventCacheState,
449        pagination_status: SharedObservable<RoomPaginationStatus>,
450        room_id: OwnedRoomId,
451        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
452        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
453    ) -> Self {
454        let sender = Sender::new(32);
455        let weak_room = WeakRoom::new(client, room_id);
456        Self {
457            room_id: weak_room.room_id().to_owned(),
458            weak_room,
459            state: RwLock::new(state),
460            sender,
461            pagination_batch_token_notifier: Default::default(),
462            auto_shrink_sender,
463            pagination_status,
464            generic_update_sender,
465        }
466    }
467
468    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
469        if account_data.is_empty() {
470            return;
471        }
472
473        let mut handled_read_marker = false;
474
475        trace!("Handling account data");
476
477        for raw_event in account_data {
478            match raw_event.deserialize() {
479                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
480                    // If duplicated, do not forward read marker multiple times
481                    // to avoid clutter the update channel.
482                    if handled_read_marker {
483                        continue;
484                    }
485
486                    handled_read_marker = true;
487
488                    // Propagate to observers. (We ignore the error if there aren't any.)
489                    let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
490                        event_id: ev.content.event_id,
491                    });
492                }
493
494                Ok(_) => {
495                    // We're not interested in other room account data updates,
496                    // at this point.
497                }
498
499                Err(e) => {
500                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
501                    warn!(event_type, "Failed to deserialize account data: {e}");
502                }
503            }
504        }
505    }
506
507    #[instrument(skip_all, fields(room_id = %self.room_id))]
508    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
509        self.handle_timeline(
510            updates.timeline,
511            updates.ephemeral.clone(),
512            updates.ambiguity_changes,
513        )
514        .await?;
515        self.handle_account_data(updates.account_data);
516
517        Ok(())
518    }
519
520    #[instrument(skip_all, fields(room_id = %self.room_id))]
521    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
522        self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
523
524        Ok(())
525    }
526
527    /// Handle a [`Timeline`], i.e. new events received by a sync for this
528    /// room.
529    async fn handle_timeline(
530        &self,
531        timeline: Timeline,
532        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
533        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
534    ) -> Result<()> {
535        if timeline.events.is_empty()
536            && timeline.prev_batch.is_none()
537            && ephemeral_events.is_empty()
538            && ambiguity_changes.is_empty()
539        {
540            return Ok(());
541        }
542
543        // Add all the events to the backend.
544        trace!("adding new events");
545
546        let (stored_prev_batch_token, timeline_event_diffs) =
547            self.state.write().await.handle_sync(timeline).await?;
548
549        // Now that all events have been added, we can trigger the
550        // `pagination_token_notifier`.
551        if stored_prev_batch_token {
552            self.pagination_batch_token_notifier.notify_one();
553        }
554
555        // The order matters here: first send the timeline event diffs, then only the
556        // related events (read receipts, etc.).
557        if !timeline_event_diffs.is_empty() {
558            let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
559                diffs: timeline_event_diffs,
560                origin: EventsOrigin::Sync,
561            });
562
563            let _ = self
564                .generic_update_sender
565                .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
566        }
567
568        if !ephemeral_events.is_empty() {
569            let _ = self
570                .sender
571                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
572        }
573
574        if !ambiguity_changes.is_empty() {
575            let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
576        }
577
578        Ok(())
579    }
580}
581
582/// Internal type to represent the output of
583/// [`RoomEventCacheState::load_more_events_backwards`].
584#[derive(Debug)]
585pub(super) enum LoadMoreEventsBackwardsOutcome {
586    /// A gap has been inserted.
587    Gap {
588        /// The previous batch token to be used as the "end" parameter in the
589        /// back-pagination request.
590        prev_token: Option<String>,
591    },
592
593    /// The start of the timeline has been reached.
594    StartOfTimeline,
595
596    /// Events have been inserted.
597    Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
598
599    /// The caller must wait for the initial previous-batch token, and retry.
600    WaitForInitialPrevToken,
601}
602
603// Use a private module to hide `events` to this parent module.
604mod private {
605    use std::{
606        collections::{BTreeMap, HashMap, HashSet},
607        sync::{Arc, atomic::AtomicUsize},
608    };
609
610    use eyeball::SharedObservable;
611    use eyeball_im::VectorDiff;
612    use matrix_sdk_base::{
613        apply_redaction,
614        deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
615        event_cache::{
616            Event, Gap,
617            store::{DynEventCacheStore, EventCacheStoreLock},
618        },
619        linked_chunk::{
620            ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
621            OwnedLinkedChunkId, Position, Update,
622            lazy_loader::{self},
623        },
624        serde_helpers::extract_thread_root,
625        sync::Timeline,
626    };
627    use matrix_sdk_common::executor::spawn;
628    use ruma::{
629        EventId, OwnedEventId, OwnedRoomId,
630        events::{
631            AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
632            relation::RelationType, room::redaction::SyncRoomRedactionEvent,
633        },
634        room_version_rules::RoomVersionRules,
635        serde::Raw,
636    };
637    use tokio::sync::broadcast::{Receiver, Sender};
638    use tracing::{debug, error, instrument, trace, warn};
639
640    use super::{
641        super::{EventCacheError, deduplicator::DeduplicationOutcome},
642        EventLocation, LoadMoreEventsBackwardsOutcome,
643        events::EventLinkedChunk,
644        sort_positions_descending,
645    };
646    use crate::event_cache::{
647        BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
648        ThreadEventCacheUpdate, deduplicator::filter_duplicate_events,
649        room::threads::ThreadEventCache,
650    };
651
652    /// State for a single room's event cache.
653    ///
654    /// This contains all the inner mutable states that ought to be updated at
655    /// the same time.
656    pub struct RoomEventCacheState {
657        /// The room this state relates to.
658        room: OwnedRoomId,
659
660        /// The rules for the version of this room.
661        room_version_rules: RoomVersionRules,
662
663        /// Whether thread support has been enabled for the event cache.
664        enabled_thread_support: bool,
665
666        /// Reference to the underlying backing store.
667        store: EventCacheStoreLock,
668
669        /// The loaded events for the current room, that is, the in-memory
670        /// linked chunk for this room.
671        room_linked_chunk: EventLinkedChunk,
672
673        /// Threads present in this room.
674        ///
675        /// Keyed by the thread root event ID.
676        threads: HashMap<OwnedEventId, ThreadEventCache>,
677
678        /// Have we ever waited for a previous-batch-token to come from sync, in
679        /// the context of pagination? We do this at most once per room,
680        /// the first time we try to run backward pagination. We reset
681        /// that upon clearing the timeline events.
682        pub waited_for_initial_prev_token: bool,
683
684        pagination_status: SharedObservable<RoomPaginationStatus>,
685
686        /// See doc comment of
687        /// [`super::super::EventCacheInner::linked_chunk_update_sender`].
688        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
689
690        /// An atomic count of the current number of subscriber of the
691        /// [`super::RoomEventCache`].
692        pub(super) subscriber_count: Arc<AtomicUsize>,
693    }
694
695    impl RoomEventCacheState {
696        /// Create a new state, or reload it from storage if it's been enabled.
697        ///
698        /// Not all events are going to be loaded. Only a portion of them. The
699        /// [`EventLinkedChunk`] relies on a [`LinkedChunk`] to store all
700        /// events. Only the last chunk will be loaded. It means the
701        /// events are loaded from the most recent to the oldest. To
702        /// load more events, see [`Self::load_more_events_backwards`].
703        ///
704        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
705        pub async fn new(
706            room_id: OwnedRoomId,
707            room_version_rules: RoomVersionRules,
708            enabled_thread_support: bool,
709            linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
710            store: EventCacheStoreLock,
711            pagination_status: SharedObservable<RoomPaginationStatus>,
712        ) -> Result<Self, EventCacheError> {
713            let store_lock = store.lock().await?;
714
715            let linked_chunk_id = LinkedChunkId::Room(&room_id);
716
717            // Load the full linked chunk's metadata, so as to feed the order tracker.
718            //
719            // If loading the full linked chunk failed, we'll clear the event cache, as it
720            // indicates that at some point, there's some malformed data.
721            let full_linked_chunk_metadata =
722                match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await {
723                    Ok(metas) => metas,
724                    Err(err) => {
725                        error!(
726                            "error when loading a linked chunk's metadata from the store: {err}"
727                        );
728
729                        // Try to clear storage for this room.
730                        store_lock
731                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
732                            .await?;
733
734                        // Restart with an empty linked chunk.
735                        None
736                    }
737                };
738
739            let linked_chunk = match store_lock
740                .load_last_chunk(linked_chunk_id)
741                .await
742                .map_err(EventCacheError::from)
743                .and_then(|(last_chunk, chunk_identifier_generator)| {
744                    lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
745                        .map_err(EventCacheError::from)
746                }) {
747                Ok(linked_chunk) => linked_chunk,
748                Err(err) => {
749                    error!(
750                        "error when loading a linked chunk's latest chunk from the store: {err}"
751                    );
752
753                    // Try to clear storage for this room.
754                    store_lock
755                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
756                        .await?;
757
758                    None
759                }
760            };
761
762            let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk(
763                linked_chunk,
764                full_linked_chunk_metadata,
765            );
766
767            // The threads mapping is intentionally empty at start, since we're going to
768            // reload threads lazily, as soon as we need to (based on external
769            // subscribers) or when we get new information about those (from
770            // sync).
771            let threads = HashMap::new();
772
773            Ok(Self {
774                room: room_id,
775                room_version_rules,
776                enabled_thread_support,
777                store,
778                room_linked_chunk,
779                threads,
780                waited_for_initial_prev_token: false,
781                subscriber_count: Default::default(),
782                pagination_status,
783                linked_chunk_update_sender,
784            })
785        }
786
787        /// Load a linked chunk's full metadata, making sure the chunks are
788        /// according to their their links.
789        ///
790        /// Returns `None` if there's no such linked chunk in the store, or an
791        /// error if the linked chunk is malformed.
792        async fn load_linked_chunk_metadata(
793            store: &DynEventCacheStore,
794            linked_chunk_id: LinkedChunkId<'_>,
795        ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
796            let mut all_chunks = store
797                .load_all_chunks_metadata(linked_chunk_id)
798                .await
799                .map_err(EventCacheError::from)?;
800
801            if all_chunks.is_empty() {
802                // There are no chunks, so there's nothing to do.
803                return Ok(None);
804            }
805
806            // Transform the vector into a hashmap, for quick lookup of the predecessors.
807            let chunk_map: HashMap<_, _> =
808                all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
809
810            // Find a last chunk.
811            let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
812            let Some(last) = iter.next() else {
813                return Err(EventCacheError::InvalidLinkedChunkMetadata {
814                    details: "no last chunk found".to_owned(),
815                });
816            };
817
818            // There must at most one last chunk.
819            if let Some(other_last) = iter.next() {
820                return Err(EventCacheError::InvalidLinkedChunkMetadata {
821                    details: format!(
822                        "chunks {} and {} both claim to be last chunks",
823                        last.identifier.index(),
824                        other_last.identifier.index()
825                    ),
826                });
827            }
828
829            // Rewind the chain back to the first chunk, and do some checks at the same
830            // time.
831            let mut seen = HashSet::new();
832            let mut current = last;
833            loop {
834                // If we've already seen this chunk, there's a cycle somewhere.
835                if !seen.insert(current.identifier) {
836                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
837                        details: format!(
838                            "cycle detected in linked chunk at {}",
839                            current.identifier.index()
840                        ),
841                    });
842                }
843
844                let Some(prev_id) = current.previous else {
845                    // If there's no previous chunk, we're done.
846                    if seen.len() != all_chunks.len() {
847                        return Err(EventCacheError::InvalidLinkedChunkMetadata {
848                            details: format!(
849                                "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
850                                seen.len(),
851                                all_chunks.len()
852                            ),
853                        });
854                    }
855                    break;
856                };
857
858                // If the previous chunk is not in the map, then it's unknown
859                // and missing.
860                let Some(pred_meta) = chunk_map.get(&prev_id) else {
861                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
862                        details: format!(
863                            "missing predecessor {} chunk for {}",
864                            prev_id.index(),
865                            current.identifier.index()
866                        ),
867                    });
868                };
869
870                // If the previous chunk isn't connected to the next, then the link is invalid.
871                if pred_meta.next != Some(current.identifier) {
872                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
873                        details: format!(
874                            "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
875                            pred_meta.identifier.index(),
876                            pred_meta.next.map(|chunk_id| chunk_id.index()),
877                            current.identifier.index()
878                        ),
879                    });
880                }
881
882                current = *pred_meta;
883            }
884
885            // At this point, `current` is the identifier of the first chunk.
886            //
887            // Reorder the resulting vector, by going through the chain of `next` links, and
888            // swapping items into their final position.
889            //
890            // Invariant in this loop: all items in [0..i[ are in their final, correct
891            // position.
892            let mut current = current.identifier;
893            for i in 0..all_chunks.len() {
894                // Find the target metadata.
895                let j = all_chunks
896                    .iter()
897                    .rev()
898                    .position(|meta| meta.identifier == current)
899                    .map(|j| all_chunks.len() - 1 - j)
900                    .expect("the target chunk must be present in the metadata");
901                if i != j {
902                    all_chunks.swap(i, j);
903                }
904                if let Some(next) = all_chunks[i].next {
905                    current = next;
906                }
907            }
908
909            Ok(Some(all_chunks))
910        }
911
912        /// Given a fully-loaded linked chunk with no gaps, return the
913        /// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache.
914        fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
915            // If we never received events for this room, this means we've never
916            // received a sync for that room, because every room must have at least a
917            // room creation event. Otherwise, we have reached the start of the
918            // timeline.
919            if self.room_linked_chunk.events().next().is_some() {
920                // If there's at least one event, this means we've reached the start of the
921                // timeline, since the chunk is fully loaded.
922                trace!("chunk is fully loaded and non-empty: reached_start=true");
923                LoadMoreEventsBackwardsOutcome::StartOfTimeline
924            } else if !self.waited_for_initial_prev_token {
925                // There's no events. Since we haven't yet, wait for an initial previous-token.
926                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
927            } else {
928                // Otherwise, we've already waited, *and* received no previous-batch token from
929                // the sync, *and* there are still no events in the fully-loaded
930                // chunk: start back-pagination from the end of the room.
931                LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
932            }
933        }
934
935        /// Load more events backwards if the last chunk is **not** a gap.
936        pub(in super::super) async fn load_more_events_backwards(
937            &mut self,
938        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
939            // If any in-memory chunk is a gap, don't load more events, and let the caller
940            // resolve the gap.
941            if let Some(prev_token) = self.room_linked_chunk.rgap().map(|gap| gap.prev_token) {
942                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
943            }
944
945            // Because `first_chunk` is `not `Send`, get this information before the
946            // `.await` point, so that this `Future` can implement `Send`.
947            let first_chunk_identifier = self
948                .room_linked_chunk
949                .chunks()
950                .next()
951                .expect("a linked chunk is never empty")
952                .identifier();
953
954            let store = self.store.lock().await?;
955
956            // The first chunk is not a gap, we can load its previous chunk.
957            let linked_chunk_id = LinkedChunkId::Room(&self.room);
958            let new_first_chunk = match store
959                .load_previous_chunk(linked_chunk_id, first_chunk_identifier)
960                .await
961            {
962                Ok(Some(new_first_chunk)) => {
963                    // All good, let's continue with this chunk.
964                    new_first_chunk
965                }
966
967                Ok(None) => {
968                    // There's no previous chunk. The chunk is now fully-loaded. Conclude.
969                    return Ok(self.conclude_load_more_for_fully_loaded_chunk());
970                }
971
972                Err(err) => {
973                    error!("error when loading the previous chunk of a linked chunk: {err}");
974
975                    // Clear storage for this room.
976                    store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
977
978                    // Return the error.
979                    return Err(err.into());
980                }
981            };
982
983            let chunk_content = new_first_chunk.content.clone();
984
985            // We've reached the start on disk, if and only if, there was no chunk prior to
986            // the one we just loaded.
987            //
988            // This value is correct, if and only if, it is used for a chunk content of kind
989            // `Items`.
990            let reached_start = new_first_chunk.previous.is_none();
991
992            if let Err(err) = self.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk) {
993                error!("error when inserting the previous chunk into its linked chunk: {err}");
994
995                // Clear storage for this room.
996                store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
997
998                // Return the error.
999                return Err(err.into());
1000            }
1001
1002            // ⚠️ Let's not propagate the updates to the store! We already have these data
1003            // in the store! Let's drain them.
1004            let _ = self.room_linked_chunk.store_updates().take();
1005
1006            // However, we want to get updates as `VectorDiff`s.
1007            let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1008
1009            Ok(match chunk_content {
1010                ChunkContent::Gap(gap) => {
1011                    trace!("reloaded chunk from disk (gap)");
1012                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1013                }
1014
1015                ChunkContent::Items(events) => {
1016                    trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1017                    LoadMoreEventsBackwardsOutcome::Events {
1018                        events,
1019                        timeline_event_diffs,
1020                        reached_start,
1021                    }
1022                }
1023            })
1024        }
1025
1026        /// If storage is enabled, unload all the chunks, then reloads only the
1027        /// last one.
1028        ///
1029        /// If storage's enabled, return a diff update that starts with a clear
1030        /// of all events; as a result, the caller may override any
1031        /// pending diff updates with the result of this function.
1032        ///
1033        /// Otherwise, returns `None`.
1034        pub(super) async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1035            let store_lock = self.store.lock().await?;
1036
1037            // Attempt to load the last chunk.
1038            let linked_chunk_id = LinkedChunkId::Room(&self.room);
1039            let (last_chunk, chunk_identifier_generator) =
1040                match store_lock.load_last_chunk(linked_chunk_id).await {
1041                    Ok(pair) => pair,
1042
1043                    Err(err) => {
1044                        // If loading the last chunk failed, clear the entire linked chunk.
1045                        error!("error when reloading a linked chunk from memory: {err}");
1046
1047                        // Clear storage for this room.
1048                        store_lock
1049                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1050                            .await?;
1051
1052                        // Restart with an empty linked chunk.
1053                        (None, ChunkIdentifierGenerator::new_from_scratch())
1054                    }
1055                };
1056
1057            debug!("unloading the linked chunk, and resetting it to its last chunk");
1058
1059            // Remove all the chunks from the linked chunks, except for the last one, and
1060            // updates the chunk identifier generator.
1061            if let Err(err) =
1062                self.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1063            {
1064                error!("error when replacing the linked chunk: {err}");
1065                return self.reset_internal().await;
1066            }
1067
1068            // Let pagination observers know that we may have not reached the start of the
1069            // timeline.
1070            // TODO: likely need to cancel any ongoing pagination.
1071            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1072
1073            // Don't propagate those updates to the store; this is only for the in-memory
1074            // representation that we're doing this. Let's drain those store updates.
1075            let _ = self.room_linked_chunk.store_updates().take();
1076
1077            Ok(())
1078        }
1079
1080        /// Automatically shrink the room if there are no more subscribers, as
1081        /// indicated by the atomic number of active subscribers.
1082        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1083        pub(crate) async fn auto_shrink_if_no_subscribers(
1084            &mut self,
1085        ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1086            let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst);
1087
1088            trace!(subscriber_count, "received request to auto-shrink");
1089
1090            if subscriber_count == 0 {
1091                // If we are the last strong reference to the auto-shrinker, we can shrink the
1092                // events data structure to its last chunk.
1093                self.shrink_to_last_chunk().await?;
1094                Ok(Some(self.room_linked_chunk.updates_as_vector_diffs()))
1095            } else {
1096                Ok(None)
1097            }
1098        }
1099
1100        #[cfg(test)]
1101        pub(crate) async fn force_shrink_to_last_chunk(
1102            &mut self,
1103        ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1104            self.shrink_to_last_chunk().await?;
1105            Ok(self.room_linked_chunk.updates_as_vector_diffs())
1106        }
1107
1108        pub(crate) fn room_event_order(&self, event_pos: Position) -> Option<usize> {
1109            self.room_linked_chunk.event_order(event_pos)
1110        }
1111
1112        /// Removes the bundled relations from an event, if they were present.
1113        ///
1114        /// Only replaces the present if it contained bundled relations.
1115        fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1116            // We're going to get rid of the `unsigned`/`m.relations` field, if it's
1117            // present.
1118            // Use a closure that returns an option so we can quickly short-circuit.
1119            let mut closure = || -> Option<()> {
1120                let mut val: serde_json::Value = event.deserialize_as().ok()?;
1121                let unsigned = val.get_mut("unsigned")?;
1122                let unsigned_obj = unsigned.as_object_mut()?;
1123                if unsigned_obj.remove("m.relations").is_some() {
1124                    *event = Raw::new(&val).ok()?.cast_unchecked();
1125                }
1126                None
1127            };
1128            let _ = closure();
1129        }
1130
1131        fn strip_relations_from_event(ev: &mut Event) {
1132            match &mut ev.kind {
1133                TimelineEventKind::Decrypted(decrypted) => {
1134                    // Remove all information about encryption info for
1135                    // the bundled events.
1136                    decrypted.unsigned_encryption_info = None;
1137
1138                    // Remove the `unsigned`/`m.relations` field, if needs be.
1139                    Self::strip_relations_if_present(&mut decrypted.event);
1140                }
1141
1142                TimelineEventKind::UnableToDecrypt { event, .. }
1143                | TimelineEventKind::PlainText { event } => {
1144                    Self::strip_relations_if_present(event);
1145                }
1146            }
1147        }
1148
1149        /// Strips the bundled relations from a collection of events.
1150        fn strip_relations_from_events(items: &mut [Event]) {
1151            for ev in items.iter_mut() {
1152                Self::strip_relations_from_event(ev);
1153            }
1154        }
1155
1156        /// Remove events by their position, in `EventLinkedChunk` and in
1157        /// `EventCacheStore`.
1158        ///
1159        /// This method is purposely isolated because it must ensure that
1160        /// positions are sorted appropriately or it can be disastrous.
1161        #[instrument(skip_all)]
1162        async fn remove_events(
1163            &mut self,
1164            in_memory_events: Vec<(OwnedEventId, Position)>,
1165            in_store_events: Vec<(OwnedEventId, Position)>,
1166        ) -> Result<(), EventCacheError> {
1167            // In-store events.
1168            if !in_store_events.is_empty() {
1169                let mut positions = in_store_events
1170                    .into_iter()
1171                    .map(|(_event_id, position)| position)
1172                    .collect::<Vec<_>>();
1173
1174                sort_positions_descending(&mut positions);
1175
1176                let updates = positions
1177                    .into_iter()
1178                    .map(|pos| Update::RemoveItem { at: pos })
1179                    .collect::<Vec<_>>();
1180
1181                self.apply_store_only_updates(updates).await?;
1182            }
1183
1184            // In-memory events.
1185            if in_memory_events.is_empty() {
1186                // Nothing else to do, return early.
1187                return Ok(());
1188            }
1189
1190            // `remove_events_by_position` is responsible of sorting positions.
1191            self.room_linked_chunk
1192                .remove_events_by_position(
1193                    in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1194                )
1195                .expect("failed to remove an event");
1196
1197            self.propagate_changes().await
1198        }
1199
1200        /// Propagate changes to the underlying storage.
1201        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1202            let updates = self.room_linked_chunk.store_updates().take();
1203            self.send_updates_to_store(updates).await
1204        }
1205
1206        /// Apply some updates that are effective only on the store itself.
1207        ///
1208        /// This method should be used only for updates that happen *outside*
1209        /// the in-memory linked chunk. Such updates must be applied
1210        /// onto the ordering tracker as well as to the persistent
1211        /// storage.
1212        async fn apply_store_only_updates(
1213            &mut self,
1214            updates: Vec<Update<Event, Gap>>,
1215        ) -> Result<(), EventCacheError> {
1216            self.room_linked_chunk.order_tracker.map_updates(&updates);
1217            self.send_updates_to_store(updates).await
1218        }
1219
1220        async fn send_updates_to_store(
1221            &mut self,
1222            mut updates: Vec<Update<Event, Gap>>,
1223        ) -> Result<(), EventCacheError> {
1224            if updates.is_empty() {
1225                return Ok(());
1226            }
1227
1228            // Strip relations from updates which insert or replace items.
1229            for update in updates.iter_mut() {
1230                match update {
1231                    Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1232                    Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1233                    // Other update kinds don't involve adding new events.
1234                    Update::NewItemsChunk { .. }
1235                    | Update::NewGapChunk { .. }
1236                    | Update::RemoveChunk(_)
1237                    | Update::RemoveItem { .. }
1238                    | Update::DetachLastItems { .. }
1239                    | Update::StartReattachItems
1240                    | Update::EndReattachItems
1241                    | Update::Clear => {}
1242                }
1243            }
1244
1245            // Spawn a task to make sure that all the changes are effectively forwarded to
1246            // the store, even if the call to this method gets aborted.
1247            //
1248            // The store cross-process locking involves an actual mutex, which ensures that
1249            // storing updates happens in the expected order.
1250
1251            let store = self.store.clone();
1252            let room_id = self.room.clone();
1253            let cloned_updates = updates.clone();
1254
1255            spawn(async move {
1256                let store = store.lock().await?;
1257
1258                trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1259                let linked_chunk_id = LinkedChunkId::Room(&room_id);
1260                store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1261                trace!("linked chunk updates applied");
1262
1263                super::Result::Ok(())
1264            })
1265            .await
1266            .expect("joining failed")?;
1267
1268            // Forward that the store got updated to observers.
1269            let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1270                linked_chunk_id: OwnedLinkedChunkId::Room(self.room.clone()),
1271                updates,
1272            });
1273
1274            Ok(())
1275        }
1276
1277        /// Reset this data structure as if it were brand new.
1278        ///
1279        /// Return a single diff update that is a clear of all events; as a
1280        /// result, the caller may override any pending diff updates
1281        /// with the result of this function.
1282        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1283        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1284            self.reset_internal().await?;
1285
1286            let diff_updates = self.room_linked_chunk.updates_as_vector_diffs();
1287
1288            // Ensure the contract defined in the doc comment is true:
1289            debug_assert_eq!(diff_updates.len(), 1);
1290            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1291
1292            Ok(diff_updates)
1293        }
1294
1295        async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1296            self.room_linked_chunk.reset();
1297
1298            // No need to update the thread summaries: the room events are
1299            // gone because of the
1300            // reset of `room_linked_chunk`.
1301            //
1302            // Clear the threads.
1303            for thread in self.threads.values_mut() {
1304                thread.clear();
1305            }
1306
1307            self.propagate_changes().await?;
1308
1309            // Reset the pagination state too: pretend we never waited for the initial
1310            // prev-batch token, and indicate that we're not at the start of the
1311            // timeline, since we don't know about that anymore.
1312            self.waited_for_initial_prev_token = false;
1313            // TODO: likely must cancel any ongoing back-paginations too
1314            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1315
1316            Ok(())
1317        }
1318
1319        /// Returns a read-only reference to the underlying room linked chunk.
1320        pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1321            &self.room_linked_chunk
1322        }
1323
1324        //// Find a single event in this room, starting from the most recent event.
1325        ///
1326        /// **Warning**! It looks into the loaded events from the in-memory
1327        /// linked chunk **only**. It doesn't look inside the storage,
1328        /// contrary to [`Self::find_event`].
1329        pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1330        where
1331            P: FnMut(&Event) -> Option<O>,
1332        {
1333            self.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event))
1334        }
1335
1336        /// Find a single event in this room.
1337        ///
1338        /// It starts by looking into loaded events in `EventLinkedChunk` before
1339        /// looking inside the storage.
1340        pub async fn find_event(
1341            &self,
1342            event_id: &EventId,
1343        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1344            // There are supposedly fewer events loaded in memory than in the store. Let's
1345            // start by looking up in the `EventLinkedChunk`.
1346            for (position, event) in self.room_linked_chunk.revents() {
1347                if event.event_id().as_deref() == Some(event_id) {
1348                    return Ok(Some((EventLocation::Memory(position), event.clone())));
1349                }
1350            }
1351
1352            let store = self.store.lock().await?;
1353
1354            Ok(store
1355                .find_event(&self.room, event_id)
1356                .await?
1357                .map(|event| (EventLocation::Store, event)))
1358        }
1359
1360        /// Find an event and all its relations in the persisted storage.
1361        ///
1362        /// This goes straight to the database, as a simplification; we don't
1363        /// expect to need to have to look up in memory events, or that
1364        /// all the related events are actually loaded.
1365        ///
1366        /// The related events are sorted like this:
1367        /// - events saved out-of-band with
1368        ///   [`super::RoomEventCache::save_events`] will be located at the
1369        ///   beginning of the array.
1370        /// - events present in the linked chunk (be it in memory or in the
1371        ///   database) will be sorted according to their ordering in the linked
1372        ///   chunk.
1373        pub async fn find_event_with_relations(
1374            &self,
1375            event_id: &EventId,
1376            filters: Option<Vec<RelationType>>,
1377        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1378            let store = self.store.lock().await?;
1379
1380            // First, hit storage to get the target event and its related events.
1381            let found = store.find_event(&self.room, event_id).await?;
1382
1383            let Some(target) = found else {
1384                // We haven't found the event: return early.
1385                return Ok(None);
1386            };
1387
1388            // Then, initialize the stack with all the related events, to find the
1389            // transitive closure of all the related events.
1390            let mut related =
1391                store.find_event_relations(&self.room, event_id, filters.as_deref()).await?;
1392            let mut stack =
1393                related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
1394
1395            // Also keep track of already seen events, in case there's a loop in the
1396            // relation graph.
1397            let mut already_seen = HashSet::new();
1398            already_seen.insert(event_id.to_owned());
1399
1400            let mut num_iters = 1;
1401
1402            // Find the related event for each previously-related event.
1403            while let Some(event_id) = stack.pop() {
1404                if !already_seen.insert(event_id.clone()) {
1405                    // Skip events we've already seen.
1406                    continue;
1407                }
1408
1409                let other_related =
1410                    store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?;
1411
1412                stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
1413                related.extend(other_related);
1414
1415                num_iters += 1;
1416            }
1417
1418            trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
1419
1420            // Sort the results by their positions in the linked chunk, if available.
1421            //
1422            // If an event doesn't have a known position, it goes to the start of the array.
1423            related.sort_by(|(_, lhs), (_, rhs)| {
1424                use std::cmp::Ordering;
1425                match (lhs, rhs) {
1426                    (None, None) => Ordering::Equal,
1427                    (None, Some(_)) => Ordering::Less,
1428                    (Some(_), None) => Ordering::Greater,
1429                    (Some(lhs), Some(rhs)) => {
1430                        let lhs = self.room_event_order(*lhs);
1431                        let rhs = self.room_event_order(*rhs);
1432
1433                        // The events should have a definite position, but in the case they don't,
1434                        // still consider that not having a position means you'll end at the start
1435                        // of the array.
1436                        match (lhs, rhs) {
1437                            (None, None) => Ordering::Equal,
1438                            (None, Some(_)) => Ordering::Less,
1439                            (Some(_), None) => Ordering::Greater,
1440                            (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
1441                        }
1442                    }
1443                }
1444            });
1445
1446            // Keep only the events, not their positions.
1447            let related = related.into_iter().map(|(event, _pos)| event).collect();
1448
1449            Ok(Some((target, related)))
1450        }
1451
1452        /// Post-process new events, after they have been added to the in-memory
1453        /// linked chunk.
1454        ///
1455        /// Flushes updates to disk first.
1456        async fn post_process_new_events(
1457            &mut self,
1458            events: Vec<Event>,
1459            is_sync: bool,
1460        ) -> Result<(), EventCacheError> {
1461            // Update the store before doing the post-processing.
1462            self.propagate_changes().await?;
1463
1464            let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1465
1466            for event in events {
1467                self.maybe_apply_new_redaction(&event).await?;
1468
1469                if self.enabled_thread_support {
1470                    if let Some(thread_root) = extract_thread_root(event.raw()) {
1471                        new_events_by_thread.entry(thread_root).or_default().push(event.clone());
1472                    } else if let Some(event_id) = event.event_id() {
1473                        // If we spot the root of a thread, add it to its linked chunk.
1474                        if self.threads.contains_key(&event_id) {
1475                            new_events_by_thread.entry(event_id).or_default().push(event.clone());
1476                        }
1477                    }
1478                }
1479
1480                // Save a bundled thread event, if there was one.
1481                if let Some(bundled_thread) = event.bundled_latest_thread_event {
1482                    self.save_event([*bundled_thread]).await?;
1483                }
1484            }
1485
1486            self.update_threads(new_events_by_thread, is_sync).await?;
1487
1488            Ok(())
1489        }
1490
1491        fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1492            // TODO: when there's persistent storage, try to lazily reload from disk, if
1493            // missing from memory.
1494            self.threads.entry(root_event_id.clone()).or_insert_with(|| {
1495                ThreadEventCache::new(
1496                    self.room.clone(),
1497                    root_event_id,
1498                    self.linked_chunk_update_sender.clone(),
1499                )
1500            })
1501        }
1502
1503        #[instrument(skip_all)]
1504        async fn update_threads(
1505            &mut self,
1506            new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1507            is_sync: bool,
1508        ) -> Result<(), EventCacheError> {
1509            for (thread_root, new_events) in new_events_by_thread {
1510                let thread_cache = self.get_or_reload_thread(thread_root.clone());
1511
1512                // If we're not in sync mode, we're receiving events from a room pagination: as
1513                // we don't know where they should be put in a thread linked
1514                // chunk, we don't try to be smart and include them. That's for
1515                // the best.
1516                if is_sync {
1517                    thread_cache.add_live_events(new_events);
1518                }
1519
1520                // Add a thread summary to the (room) event which has the thread root, if we
1521                // knew about it.
1522
1523                let last_event_id = thread_cache.latest_event_id();
1524
1525                let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1526                else {
1527                    trace!(%thread_root, "thread root event is missing from the linked chunk");
1528                    continue;
1529                };
1530
1531                let prev_summary = target_event.thread_summary.summary();
1532                let mut latest_reply =
1533                    prev_summary.as_ref().and_then(|summary| summary.latest_reply.clone());
1534
1535                // Recompute the thread summary, if needs be.
1536
1537                // Read the latest number of thread replies from the store.
1538                //
1539                // Implementation note: since this is based on the `m.relates_to` field, and
1540                // that field can only be present on room messages, we don't have to
1541                // worry about filtering out aggregation events (like
1542                // reactions/edits/etc.). Pretty neat, huh?
1543                let num_replies = {
1544                    let store_guard = &*self.store.lock().await?;
1545                    let related_thread_events = store_guard
1546                        .find_event_relations(
1547                            &self.room,
1548                            &thread_root,
1549                            Some(&[RelationType::Thread]),
1550                        )
1551                        .await?;
1552                    related_thread_events.len().try_into().unwrap_or(u32::MAX)
1553                };
1554
1555                if let Some(last_event_id) = last_event_id {
1556                    latest_reply = Some(last_event_id);
1557                }
1558
1559                let new_summary = ThreadSummary { num_replies, latest_reply };
1560
1561                if prev_summary == Some(&new_summary) {
1562                    trace!(%thread_root, "thread summary is already up-to-date");
1563                    continue;
1564                }
1565
1566                // Trigger an update to observers.
1567                target_event.thread_summary = ThreadSummaryStatus::Some(new_summary);
1568                self.replace_event_at(location, target_event).await?;
1569            }
1570
1571            Ok(())
1572        }
1573
1574        /// Replaces a single event, be it saved in memory or in the store.
1575        ///
1576        /// If it was saved in memory, this will emit a notification to
1577        /// observers that a single item has been replaced. Otherwise,
1578        /// such a notification is not emitted, because observers are
1579        /// unlikely to observe the store updates directly.
1580        async fn replace_event_at(
1581            &mut self,
1582            location: EventLocation,
1583            event: Event,
1584        ) -> Result<(), EventCacheError> {
1585            match location {
1586                EventLocation::Memory(position) => {
1587                    self.room_linked_chunk
1588                        .replace_event_at(position, event)
1589                        .expect("should have been a valid position of an item");
1590                    // We just changed the in-memory representation; synchronize this with
1591                    // the store.
1592                    self.propagate_changes().await?;
1593                }
1594                EventLocation::Store => {
1595                    self.save_event([event]).await?;
1596                }
1597            }
1598
1599            Ok(())
1600        }
1601
1602        /// If the given event is a redaction, try to retrieve the
1603        /// to-be-redacted event in the chunk, and replace it by the
1604        /// redacted form.
1605        #[instrument(skip_all)]
1606        async fn maybe_apply_new_redaction(
1607            &mut self,
1608            event: &Event,
1609        ) -> Result<(), EventCacheError> {
1610            let raw_event = event.raw();
1611
1612            // Do not deserialise the entire event if we aren't certain it's a
1613            // `m.room.redaction`. It saves a non-negligible amount of computations.
1614            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1615                raw_event.get_field::<MessageLikeEventType>("type")
1616            else {
1617                return Ok(());
1618            };
1619
1620            // It is a `m.room.redaction`! We can deserialize it entirely.
1621
1622            let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1623                redaction,
1624            ))) = raw_event.deserialize()
1625            else {
1626                return Ok(());
1627            };
1628
1629            let Some(event_id) = redaction.redacts(&self.room_version_rules.redaction) else {
1630                warn!("missing target event id from the redaction event");
1631                return Ok(());
1632            };
1633
1634            // Replace the redacted event by a redacted form, if we knew about it.
1635            let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1636                trace!("redacted event is missing from the linked chunk");
1637                return Ok(());
1638            };
1639
1640            // Don't redact already redacted events.
1641            if let Ok(deserialized) = target_event.raw().deserialize() {
1642                match deserialized {
1643                    AnySyncTimelineEvent::MessageLike(ev) => {
1644                        if ev.is_redacted() {
1645                            return Ok(());
1646                        }
1647                    }
1648                    AnySyncTimelineEvent::State(ev) => {
1649                        if ev.is_redacted() {
1650                            return Ok(());
1651                        }
1652                    }
1653                }
1654            }
1655
1656            if let Some(redacted_event) = apply_redaction(
1657                target_event.raw(),
1658                event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
1659                &self.room_version_rules.redaction,
1660            ) {
1661                // It's safe to cast `redacted_event` here:
1662                // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
1663                //   when calling .raw(), so it's still one under the hood.
1664                // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
1665                target_event.replace_raw(redacted_event.cast_unchecked());
1666
1667                self.replace_event_at(location, target_event).await?;
1668            }
1669
1670            Ok(())
1671        }
1672
1673        /// Save a single event into the database, without notifying observers.
1674        ///
1675        /// Note: if the event was already saved as part of a linked chunk, and
1676        /// its event id may have changed, it's not safe to use this
1677        /// method because it may break the link between the chunk and
1678        /// the event. Instead, an update to the linked chunk must be used.
1679        pub async fn save_event(
1680            &self,
1681            events: impl IntoIterator<Item = Event>,
1682        ) -> Result<(), EventCacheError> {
1683            let store = self.store.clone();
1684            let room_id = self.room.clone();
1685            let events = events.into_iter().collect::<Vec<_>>();
1686
1687            // Spawn a task so the save is uninterrupted by task cancellation.
1688            spawn(async move {
1689                let store = store.lock().await?;
1690                for event in events {
1691                    store.save_event(&room_id, event).await?;
1692                }
1693                super::Result::Ok(())
1694            })
1695            .await
1696            .expect("joining failed")?;
1697
1698            Ok(())
1699        }
1700
1701        /// Handle the result of a sync.
1702        ///
1703        /// It may send room event cache updates to the given sender, if it
1704        /// generated any of those.
1705        ///
1706        /// Returns `true` for the first part of the tuple if a new gap
1707        /// (previous-batch token) has been inserted, `false` otherwise.
1708        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1709        pub async fn handle_sync(
1710            &mut self,
1711            mut timeline: Timeline,
1712        ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1713            let mut prev_batch = timeline.prev_batch.take();
1714
1715            let DeduplicationOutcome {
1716                all_events: events,
1717                in_memory_duplicated_event_ids,
1718                in_store_duplicated_event_ids,
1719                non_empty_all_duplicates: all_duplicates,
1720            } = filter_duplicate_events(
1721                &self.store,
1722                LinkedChunkId::Room(self.room.as_ref()),
1723                &self.room_linked_chunk,
1724                timeline.events,
1725            )
1726            .await?;
1727
1728            // If the timeline isn't limited, and we already knew about some past events,
1729            // then this definitely knows what the timeline head is (either we know
1730            // about all the events persisted in storage, or we have a gap
1731            // somewhere). In this case, we can ditch the previous-batch
1732            // token, which is an optimization to avoid unnecessary future back-pagination
1733            // requests.
1734            //
1735            // We can also ditch it if we knew about all the events that came from sync,
1736            // namely, they were all deduplicated. In this case, using the
1737            // previous-batch token would only result in fetching other events we
1738            // knew about. This is slightly incorrect in the presence of
1739            // network splits, but this has shown to be Good Enough™.
1740            if !timeline.limited && self.room_linked_chunk.events().next().is_some()
1741                || all_duplicates
1742            {
1743                prev_batch = None;
1744            }
1745
1746            if prev_batch.is_some() {
1747                // Sad time: there's a gap, somewhere, in the timeline, and there's at least one
1748                // non-duplicated event. We don't know which threads might have gappy, so we
1749                // must invalidate them all :(
1750                // TODO(bnjbvr): figure out a better catchup mechanism for threads.
1751                let mut summaries_to_update = Vec::new();
1752
1753                for (thread_root, thread) in self.threads.iter_mut() {
1754                    // Empty the thread's linked chunk.
1755                    thread.clear();
1756
1757                    summaries_to_update.push(thread_root.clone());
1758                }
1759
1760                // Now, update the summaries to indicate that we're not sure what the latest
1761                // thread event is. The thread count can remain as is, as it might still be
1762                // valid, and there's no good value to reset it to, anyways.
1763                for thread_root in summaries_to_update {
1764                    let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1765                    else {
1766                        trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1767                        continue;
1768                    };
1769
1770                    if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1771                        prev_summary.latest_reply = None;
1772
1773                        target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1774
1775                        self.replace_event_at(location, target_event).await?;
1776                    }
1777                }
1778            }
1779
1780            if all_duplicates {
1781                // No new events and no gap (per the previous check), thus no need to change the
1782                // room state. We're done!
1783                return Ok((false, Vec::new()));
1784            }
1785
1786            let has_new_gap = prev_batch.is_some();
1787
1788            // If we've never waited for an initial previous-batch token, and we've now
1789            // inserted a gap, no need to wait for a previous-batch token later.
1790            if !self.waited_for_initial_prev_token && has_new_gap {
1791                self.waited_for_initial_prev_token = true;
1792            }
1793
1794            // Remove the old duplicated events.
1795            //
1796            // We don't have to worry the removals can change the position of the existing
1797            // events, because we are pushing all _new_ `events` at the back.
1798            self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1799                .await?;
1800
1801            self.room_linked_chunk
1802                .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1803
1804            self.post_process_new_events(events, true).await?;
1805
1806            if timeline.limited && has_new_gap {
1807                // If there was a previous batch token for a limited timeline, unload the chunks
1808                // so it only contains the last one; otherwise, there might be a
1809                // valid gap in between, and observers may not render it (yet).
1810                //
1811                // We must do this *after* persisting these events to storage (in
1812                // `post_process_new_events`).
1813                self.shrink_to_last_chunk().await?;
1814            }
1815
1816            let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1817
1818            Ok((has_new_gap, timeline_event_diffs))
1819        }
1820
1821        /// Handle the result of a single back-pagination request.
1822        ///
1823        /// If the `prev_token` is set, then this function will check that the
1824        /// corresponding gap is present in the in-memory linked chunk.
1825        /// If it's not the case, `Ok(None)` will be returned, and the
1826        /// caller may decide to do something based on that (e.g. restart a
1827        /// pagination).
1828        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1829        pub async fn handle_backpagination(
1830            &mut self,
1831            events: Vec<Event>,
1832            mut new_token: Option<String>,
1833            prev_token: Option<String>,
1834        ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1835        {
1836            // Check that the previous token still exists; otherwise it's a sign that the
1837            // room's timeline has been cleared.
1838            let prev_gap_id = if let Some(token) = prev_token {
1839                // Find the corresponding gap in the in-memory linked chunk.
1840                let gap_chunk_id = self.room_linked_chunk.chunk_identifier(|chunk| {
1841                    matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1842                });
1843
1844                if gap_chunk_id.is_none() {
1845                    // We got a previous-batch token from the linked chunk *before* running the
1846                    // request, but it is missing *after* completing the request.
1847                    //
1848                    // It may be a sign the linked chunk has been reset, but it's fine, per this
1849                    // function's contract.
1850                    return Ok(None);
1851                }
1852
1853                gap_chunk_id
1854            } else {
1855                None
1856            };
1857
1858            let DeduplicationOutcome {
1859                all_events: mut events,
1860                in_memory_duplicated_event_ids,
1861                in_store_duplicated_event_ids,
1862                non_empty_all_duplicates: all_duplicates,
1863            } = filter_duplicate_events(
1864                &self.store,
1865                LinkedChunkId::Room(self.room.as_ref()),
1866                &self.room_linked_chunk,
1867                events,
1868            )
1869            .await?;
1870
1871            // If not all the events have been back-paginated, we need to remove the
1872            // previous ones, otherwise we can end up with misordered events.
1873            //
1874            // Consider the following scenario:
1875            // - sync returns [D, E, F]
1876            // - then sync returns [] with a previous batch token PB1, so the internal
1877            //   linked chunk state is [D, E, F, PB1].
1878            // - back-paginating with PB1 may return [A, B, C, D, E, F].
1879            //
1880            // Only inserting the new events when replacing PB1 would result in a timeline
1881            // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
1882            // all the events, in case this happens (see also #4746).
1883
1884            if !all_duplicates {
1885                // Let's forget all the previous events.
1886                self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1887                    .await?;
1888            } else {
1889                // All new events are duplicated, they can all be ignored.
1890                events.clear();
1891                // The gap can be ditched too, as it won't be useful to backpaginate any
1892                // further.
1893                new_token = None;
1894            }
1895
1896            // `/messages` has been called with `dir=b` (backwards), so the events are in
1897            // the inverted order; reorder them.
1898            let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1899
1900            let new_gap = new_token.map(|prev_token| Gap { prev_token });
1901            let reached_start = self.room_linked_chunk.finish_back_pagination(
1902                prev_gap_id,
1903                new_gap,
1904                &topo_ordered_events,
1905            );
1906
1907            // Note: this flushes updates to the store.
1908            self.post_process_new_events(topo_ordered_events, false).await?;
1909
1910            let event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1911
1912            Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1913        }
1914
1915        /// Subscribe to thread for a given root event, and get a (maybe empty)
1916        /// initially known list of events for that thread.
1917        pub fn subscribe_to_thread(
1918            &mut self,
1919            root: OwnedEventId,
1920        ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1921            self.get_or_reload_thread(root).subscribe()
1922        }
1923
1924        /// Back paginate in the given thread.
1925        ///
1926        /// Will always start from the end, unless we previously paginated.
1927        pub fn finish_thread_network_pagination(
1928            &mut self,
1929            root: OwnedEventId,
1930            prev_token: Option<String>,
1931            new_token: Option<String>,
1932            events: Vec<Event>,
1933        ) -> Option<BackPaginationOutcome> {
1934            self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1935        }
1936
1937        pub fn load_more_thread_events_backwards(
1938            &mut self,
1939            root: OwnedEventId,
1940        ) -> LoadMoreEventsBackwardsOutcome {
1941            self.get_or_reload_thread(root).load_more_events_backwards()
1942        }
1943    }
1944}
1945
1946/// An enum representing where an event has been found.
1947pub(super) enum EventLocation {
1948    /// Event lives in memory (and likely in the store!).
1949    Memory(Position),
1950
1951    /// Event lives in the store only, it has not been loaded in memory yet.
1952    Store,
1953}
1954
1955pub(super) use private::RoomEventCacheState;
1956
1957#[cfg(test)]
1958mod tests {
1959    use matrix_sdk_base::event_cache::Event;
1960    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1961    use ruma::{
1962        RoomId, event_id,
1963        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1964        room_id, user_id,
1965    };
1966
1967    use crate::test_utils::logged_in_client;
1968
1969    #[async_test]
1970    async fn test_find_event_by_id_with_edit_relation() {
1971        let original_id = event_id!("$original");
1972        let related_id = event_id!("$related");
1973        let room_id = room_id!("!galette:saucisse.bzh");
1974        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1975
1976        assert_relations(
1977            room_id,
1978            f.text_msg("Original event").event_id(original_id).into(),
1979            f.text_msg("* An edited event")
1980                .edit(
1981                    original_id,
1982                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1983                )
1984                .event_id(related_id)
1985                .into(),
1986            f,
1987        )
1988        .await;
1989    }
1990
1991    #[async_test]
1992    async fn test_find_event_by_id_with_thread_reply_relation() {
1993        let original_id = event_id!("$original");
1994        let related_id = event_id!("$related");
1995        let room_id = room_id!("!galette:saucisse.bzh");
1996        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1997
1998        assert_relations(
1999            room_id,
2000            f.text_msg("Original event").event_id(original_id).into(),
2001            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2002            f,
2003        )
2004        .await;
2005    }
2006
2007    #[async_test]
2008    async fn test_find_event_by_id_with_reaction_relation() {
2009        let original_id = event_id!("$original");
2010        let related_id = event_id!("$related");
2011        let room_id = room_id!("!galette:saucisse.bzh");
2012        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2013
2014        assert_relations(
2015            room_id,
2016            f.text_msg("Original event").event_id(original_id).into(),
2017            f.reaction(original_id, ":D").event_id(related_id).into(),
2018            f,
2019        )
2020        .await;
2021    }
2022
2023    #[async_test]
2024    async fn test_find_event_by_id_with_poll_response_relation() {
2025        let original_id = event_id!("$original");
2026        let related_id = event_id!("$related");
2027        let room_id = room_id!("!galette:saucisse.bzh");
2028        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2029
2030        assert_relations(
2031            room_id,
2032            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2033                .event_id(original_id)
2034                .into(),
2035            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2036            f,
2037        )
2038        .await;
2039    }
2040
2041    #[async_test]
2042    async fn test_find_event_by_id_with_poll_end_relation() {
2043        let original_id = event_id!("$original");
2044        let related_id = event_id!("$related");
2045        let room_id = room_id!("!galette:saucisse.bzh");
2046        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2047
2048        assert_relations(
2049            room_id,
2050            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2051                .event_id(original_id)
2052                .into(),
2053            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2054            f,
2055        )
2056        .await;
2057    }
2058
2059    #[async_test]
2060    async fn test_find_event_by_id_with_filtered_relationships() {
2061        let original_id = event_id!("$original");
2062        let related_id = event_id!("$related");
2063        let associated_related_id = event_id!("$recursive_related");
2064        let room_id = room_id!("!galette:saucisse.bzh");
2065        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2066
2067        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2068        let related_event = event_factory
2069            .text_msg("* Edited event")
2070            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2071            .event_id(related_id)
2072            .into();
2073        let associated_related_event =
2074            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2075
2076        let client = logged_in_client(None).await;
2077
2078        let event_cache = client.event_cache();
2079        event_cache.subscribe().unwrap();
2080
2081        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2082        let room = client.get_room(room_id).unwrap();
2083
2084        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2085
2086        // Save the original event.
2087        room_event_cache.save_events([original_event]).await;
2088
2089        // Save the related event.
2090        room_event_cache.save_events([related_event]).await;
2091
2092        // Save the associated related event, which redacts the related event.
2093        room_event_cache.save_events([associated_related_event]).await;
2094
2095        let filter = Some(vec![RelationType::Replacement]);
2096        let (event, related_events) =
2097            room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2098        // Fetched event is the right one.
2099        let cached_event_id = event.event_id().unwrap();
2100        assert_eq!(cached_event_id, original_id);
2101
2102        // There's only the edit event (an edit event can't have its own edit event).
2103        assert_eq!(related_events.len(), 1);
2104
2105        let related_event_id = related_events[0].event_id().unwrap();
2106        assert_eq!(related_event_id, related_id);
2107
2108        // Now we'll filter threads instead, there should be no related events
2109        let filter = Some(vec![RelationType::Thread]);
2110        let (event, related_events) =
2111            room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2112        // Fetched event is the right one.
2113        let cached_event_id = event.event_id().unwrap();
2114        assert_eq!(cached_event_id, original_id);
2115        // No Thread related events found
2116        assert!(related_events.is_empty());
2117    }
2118
2119    #[async_test]
2120    async fn test_find_event_by_id_with_recursive_relation() {
2121        let original_id = event_id!("$original");
2122        let related_id = event_id!("$related");
2123        let associated_related_id = event_id!("$recursive_related");
2124        let room_id = room_id!("!galette:saucisse.bzh");
2125        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2126
2127        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2128        let related_event = event_factory
2129            .text_msg("* Edited event")
2130            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2131            .event_id(related_id)
2132            .into();
2133        let associated_related_event =
2134            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2135
2136        let client = logged_in_client(None).await;
2137
2138        let event_cache = client.event_cache();
2139        event_cache.subscribe().unwrap();
2140
2141        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2142        let room = client.get_room(room_id).unwrap();
2143
2144        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2145
2146        // Save the original event.
2147        room_event_cache.save_events([original_event]).await;
2148
2149        // Save the related event.
2150        room_event_cache.save_events([related_event]).await;
2151
2152        // Save the associated related event, which redacts the related event.
2153        room_event_cache.save_events([associated_related_event]).await;
2154
2155        let (event, related_events) =
2156            room_event_cache.find_event_with_relations(original_id, None).await.unwrap();
2157        // Fetched event is the right one.
2158        let cached_event_id = event.event_id().unwrap();
2159        assert_eq!(cached_event_id, original_id);
2160
2161        // There are both the related id and the associatively related id
2162        assert_eq!(related_events.len(), 2);
2163
2164        let related_event_id = related_events[0].event_id().unwrap();
2165        assert_eq!(related_event_id, related_id);
2166        let related_event_id = related_events[1].event_id().unwrap();
2167        assert_eq!(related_event_id, associated_related_id);
2168    }
2169
2170    async fn assert_relations(
2171        room_id: &RoomId,
2172        original_event: Event,
2173        related_event: Event,
2174        event_factory: EventFactory,
2175    ) {
2176        let client = logged_in_client(None).await;
2177
2178        let event_cache = client.event_cache();
2179        event_cache.subscribe().unwrap();
2180
2181        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2182        let room = client.get_room(room_id).unwrap();
2183
2184        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2185
2186        // Save the original event.
2187        let original_event_id = original_event.event_id().unwrap();
2188        room_event_cache.save_events([original_event]).await;
2189
2190        // Save an unrelated event to check it's not in the related events list.
2191        let unrelated_id = event_id!("$2");
2192        room_event_cache
2193            .save_events([event_factory
2194                .text_msg("An unrelated event")
2195                .event_id(unrelated_id)
2196                .into()])
2197            .await;
2198
2199        // Save the related event.
2200        let related_id = related_event.event_id().unwrap();
2201        room_event_cache.save_events([related_event]).await;
2202
2203        let (event, related_events) =
2204            room_event_cache.find_event_with_relations(&original_event_id, None).await.unwrap();
2205        // Fetched event is the right one.
2206        let cached_event_id = event.event_id().unwrap();
2207        assert_eq!(cached_event_id, original_event_id);
2208
2209        // There is only the actually related event in the related ones
2210        let related_event_id = related_events[0].event_id().unwrap();
2211        assert_eq!(related_event_id, related_id);
2212    }
2213}
2214
2215#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
2216mod timed_tests {
2217    use std::sync::Arc;
2218
2219    use assert_matches::assert_matches;
2220    use assert_matches2::assert_let;
2221    use eyeball_im::VectorDiff;
2222    use futures_util::FutureExt;
2223    use matrix_sdk_base::{
2224        event_cache::{
2225            Gap,
2226            store::{EventCacheStore as _, MemoryStore},
2227        },
2228        linked_chunk::{
2229            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2230            lazy_loader::from_all_chunks,
2231        },
2232        store::StoreConfig,
2233        sync::{JoinedRoomUpdate, Timeline},
2234    };
2235    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2236    use ruma::{
2237        OwnedUserId, event_id,
2238        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2239        room_id, user_id,
2240    };
2241    use tokio::task::yield_now;
2242
2243    use super::RoomEventCacheGenericUpdate;
2244    use crate::{
2245        assert_let_timeout,
2246        event_cache::{RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2247        test_utils::client::MockClientBuilder,
2248    };
2249
2250    #[async_test]
2251    async fn test_write_to_storage() {
2252        let room_id = room_id!("!galette:saucisse.bzh");
2253        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2254
2255        let event_cache_store = Arc::new(MemoryStore::new());
2256
2257        let client = MockClientBuilder::new(None)
2258            .on_builder(|builder| {
2259                builder.store_config(
2260                    StoreConfig::new("hodlor".to_owned())
2261                        .event_cache_store(event_cache_store.clone()),
2262                )
2263            })
2264            .build()
2265            .await;
2266
2267        let event_cache = client.event_cache();
2268
2269        // Don't forget to subscribe and like^W enable storage!
2270        event_cache.subscribe().unwrap();
2271
2272        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2273        let room = client.get_room(room_id).unwrap();
2274
2275        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2276        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2277
2278        // Propagate an update for a message and a prev-batch token.
2279        let timeline = Timeline {
2280            limited: true,
2281            prev_batch: Some("raclette".to_owned()),
2282            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2283        };
2284
2285        room_event_cache
2286            .inner
2287            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2288            .await
2289            .unwrap();
2290
2291        // Just checking the generic update is correct.
2292        assert_matches!(
2293            generic_stream.recv().await,
2294            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2295                assert_eq!(expected_room_id, room_id);
2296            }
2297        );
2298
2299        // Check the storage.
2300        let linked_chunk = from_all_chunks::<3, _, _>(
2301            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2302        )
2303        .unwrap()
2304        .unwrap();
2305
2306        assert_eq!(linked_chunk.chunks().count(), 2);
2307
2308        let mut chunks = linked_chunk.chunks();
2309
2310        // We start with the gap.
2311        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2312            assert_eq!(gap.prev_token, "raclette");
2313        });
2314
2315        // Then we have the stored event.
2316        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2317            assert_eq!(events.len(), 1);
2318            let deserialized = events[0].raw().deserialize().unwrap();
2319            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2320            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2321        });
2322
2323        // That's all, folks!
2324        assert!(chunks.next().is_none());
2325    }
2326
2327    #[async_test]
2328    async fn test_write_to_storage_strips_bundled_relations() {
2329        let room_id = room_id!("!galette:saucisse.bzh");
2330        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2331
2332        let event_cache_store = Arc::new(MemoryStore::new());
2333
2334        let client = MockClientBuilder::new(None)
2335            .on_builder(|builder| {
2336                builder.store_config(
2337                    StoreConfig::new("hodlor".to_owned())
2338                        .event_cache_store(event_cache_store.clone()),
2339                )
2340            })
2341            .build()
2342            .await;
2343
2344        let event_cache = client.event_cache();
2345
2346        // Don't forget to subscribe and like^W enable storage!
2347        event_cache.subscribe().unwrap();
2348
2349        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2350        let room = client.get_room(room_id).unwrap();
2351
2352        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2353        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2354
2355        // Propagate an update for a message with bundled relations.
2356        let ev = f
2357            .text_msg("hey yo")
2358            .sender(*ALICE)
2359            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2360            .into_event();
2361
2362        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2363
2364        room_event_cache
2365            .inner
2366            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2367            .await
2368            .unwrap();
2369
2370        // Just checking the generic update is correct.
2371        assert_matches!(
2372            generic_stream.recv().await,
2373            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2374                assert_eq!(expected_room_id, room_id);
2375            }
2376        );
2377
2378        // The in-memory linked chunk keeps the bundled relation.
2379        {
2380            let events = room_event_cache.events().await;
2381
2382            assert_eq!(events.len(), 1);
2383
2384            let ev = events[0].raw().deserialize().unwrap();
2385            assert_let!(
2386                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2387            );
2388
2389            let original = msg.as_original().unwrap();
2390            assert_eq!(original.content.body(), "hey yo");
2391            assert!(original.unsigned.relations.replace.is_some());
2392        }
2393
2394        // The one in storage does not.
2395        let linked_chunk = from_all_chunks::<3, _, _>(
2396            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2397        )
2398        .unwrap()
2399        .unwrap();
2400
2401        assert_eq!(linked_chunk.chunks().count(), 1);
2402
2403        let mut chunks = linked_chunk.chunks();
2404        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2405            assert_eq!(events.len(), 1);
2406
2407            let ev = events[0].raw().deserialize().unwrap();
2408            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2409
2410            let original = msg.as_original().unwrap();
2411            assert_eq!(original.content.body(), "hey yo");
2412            assert!(original.unsigned.relations.replace.is_none());
2413        });
2414
2415        // That's all, folks!
2416        assert!(chunks.next().is_none());
2417    }
2418
2419    #[async_test]
2420    async fn test_clear() {
2421        let room_id = room_id!("!galette:saucisse.bzh");
2422        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2423
2424        let event_cache_store = Arc::new(MemoryStore::new());
2425
2426        let event_id1 = event_id!("$1");
2427        let event_id2 = event_id!("$2");
2428
2429        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2430        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2431
2432        // Prefill the store with some data.
2433        event_cache_store
2434            .handle_linked_chunk_updates(
2435                LinkedChunkId::Room(room_id),
2436                vec![
2437                    // An empty items chunk.
2438                    Update::NewItemsChunk {
2439                        previous: None,
2440                        new: ChunkIdentifier::new(0),
2441                        next: None,
2442                    },
2443                    // A gap chunk.
2444                    Update::NewGapChunk {
2445                        previous: Some(ChunkIdentifier::new(0)),
2446                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2447                        new: ChunkIdentifier::new(42),
2448                        next: None,
2449                        gap: Gap { prev_token: "comté".to_owned() },
2450                    },
2451                    // Another items chunk, non-empty this time.
2452                    Update::NewItemsChunk {
2453                        previous: Some(ChunkIdentifier::new(42)),
2454                        new: ChunkIdentifier::new(1),
2455                        next: None,
2456                    },
2457                    Update::PushItems {
2458                        at: Position::new(ChunkIdentifier::new(1), 0),
2459                        items: vec![ev1.clone()],
2460                    },
2461                    // And another items chunk, non-empty again.
2462                    Update::NewItemsChunk {
2463                        previous: Some(ChunkIdentifier::new(1)),
2464                        new: ChunkIdentifier::new(2),
2465                        next: None,
2466                    },
2467                    Update::PushItems {
2468                        at: Position::new(ChunkIdentifier::new(2), 0),
2469                        items: vec![ev2.clone()],
2470                    },
2471                ],
2472            )
2473            .await
2474            .unwrap();
2475
2476        let client = MockClientBuilder::new(None)
2477            .on_builder(|builder| {
2478                builder.store_config(
2479                    StoreConfig::new("hodlor".to_owned())
2480                        .event_cache_store(event_cache_store.clone()),
2481                )
2482            })
2483            .build()
2484            .await;
2485
2486        let event_cache = client.event_cache();
2487
2488        // Don't forget to subscribe and like^W enable storage!
2489        event_cache.subscribe().unwrap();
2490
2491        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2492        let room = client.get_room(room_id).unwrap();
2493
2494        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2495
2496        let (items, mut stream) = room_event_cache.subscribe().await;
2497        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2498
2499        // The rooms knows about all cached events.
2500        {
2501            assert!(room_event_cache.find_event(event_id1).await.is_some());
2502            assert!(room_event_cache.find_event(event_id2).await.is_some());
2503        }
2504
2505        // But only part of events are loaded from the store
2506        {
2507            // The room must contain only one event because only one chunk has been loaded.
2508            assert_eq!(items.len(), 1);
2509            assert_eq!(items[0].event_id().unwrap(), event_id2);
2510
2511            assert!(stream.is_empty());
2512        }
2513
2514        // Let's load more chunks to load all events.
2515        {
2516            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2517
2518            assert_let_timeout!(
2519                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2520            );
2521            assert_eq!(diffs.len(), 1);
2522            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2523                // Here you are `event_id1`!
2524                assert_eq!(event.event_id().unwrap(), event_id1);
2525            });
2526
2527            assert!(stream.is_empty());
2528        }
2529
2530        // After clearing,…
2531        room_event_cache.clear().await.unwrap();
2532
2533        //… we get an update that the content has been cleared.
2534        assert_let_timeout!(
2535            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2536        );
2537        assert_eq!(diffs.len(), 1);
2538        assert_let!(VectorDiff::Clear = &diffs[0]);
2539
2540        // … same with a generic update.
2541        assert_let_timeout!(
2542            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2543        );
2544        assert_eq!(received_room_id, room_id);
2545
2546        // Events individually are not forgotten by the event cache, after clearing a
2547        // room.
2548        assert!(room_event_cache.find_event(event_id1).await.is_some());
2549
2550        // But their presence in a linked chunk is forgotten.
2551        let items = room_event_cache.events().await;
2552        assert!(items.is_empty());
2553
2554        // The event cache store too.
2555        let linked_chunk = from_all_chunks::<3, _, _>(
2556            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2557        )
2558        .unwrap()
2559        .unwrap();
2560
2561        // Note: while the event cache store could return `None` here, clearing it will
2562        // reset it to its initial form, maintaining the invariant that it
2563        // contains a single items chunk that's empty.
2564        assert_eq!(linked_chunk.num_items(), 0);
2565    }
2566
2567    #[async_test]
2568    async fn test_load_from_storage() {
2569        let room_id = room_id!("!galette:saucisse.bzh");
2570        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2571
2572        let event_cache_store = Arc::new(MemoryStore::new());
2573
2574        let event_id1 = event_id!("$1");
2575        let event_id2 = event_id!("$2");
2576
2577        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2578        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2579
2580        // Prefill the store with some data.
2581        event_cache_store
2582            .handle_linked_chunk_updates(
2583                LinkedChunkId::Room(room_id),
2584                vec![
2585                    // An empty items chunk.
2586                    Update::NewItemsChunk {
2587                        previous: None,
2588                        new: ChunkIdentifier::new(0),
2589                        next: None,
2590                    },
2591                    // A gap chunk.
2592                    Update::NewGapChunk {
2593                        previous: Some(ChunkIdentifier::new(0)),
2594                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2595                        new: ChunkIdentifier::new(42),
2596                        next: None,
2597                        gap: Gap { prev_token: "cheddar".to_owned() },
2598                    },
2599                    // Another items chunk, non-empty this time.
2600                    Update::NewItemsChunk {
2601                        previous: Some(ChunkIdentifier::new(42)),
2602                        new: ChunkIdentifier::new(1),
2603                        next: None,
2604                    },
2605                    Update::PushItems {
2606                        at: Position::new(ChunkIdentifier::new(1), 0),
2607                        items: vec![ev1.clone()],
2608                    },
2609                    // And another items chunk, non-empty again.
2610                    Update::NewItemsChunk {
2611                        previous: Some(ChunkIdentifier::new(1)),
2612                        new: ChunkIdentifier::new(2),
2613                        next: None,
2614                    },
2615                    Update::PushItems {
2616                        at: Position::new(ChunkIdentifier::new(2), 0),
2617                        items: vec![ev2.clone()],
2618                    },
2619                ],
2620            )
2621            .await
2622            .unwrap();
2623
2624        let client = MockClientBuilder::new(None)
2625            .on_builder(|builder| {
2626                builder.store_config(
2627                    StoreConfig::new("hodlor".to_owned())
2628                        .event_cache_store(event_cache_store.clone()),
2629                )
2630            })
2631            .build()
2632            .await;
2633
2634        let event_cache = client.event_cache();
2635
2636        // Don't forget to subscribe and like^W enable storage!
2637        event_cache.subscribe().unwrap();
2638
2639        // Let's check whether the generic updates are received for the initialisation.
2640        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2641
2642        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2643        let room = client.get_room(room_id).unwrap();
2644
2645        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2646
2647        // The room event cache has been loaded. A generic update must have been
2648        // triggered.
2649        assert_matches!(
2650            generic_stream.recv().await,
2651            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2652                assert_eq!(room_id, expected_room_id);
2653            }
2654        );
2655
2656        let (items, mut stream) = room_event_cache.subscribe().await;
2657
2658        // The initial items contain one event because only the last chunk is loaded by
2659        // default.
2660        assert_eq!(items.len(), 1);
2661        assert_eq!(items[0].event_id().unwrap(), event_id2);
2662        assert!(stream.is_empty());
2663
2664        // The event cache knows only all events though, even if they aren't loaded.
2665        assert!(room_event_cache.find_event(event_id1).await.is_some());
2666        assert!(room_event_cache.find_event(event_id2).await.is_some());
2667
2668        // Let's paginate to load more events.
2669        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2670
2671        assert_let_timeout!(
2672            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2673        );
2674        assert_eq!(diffs.len(), 1);
2675        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2676            assert_eq!(event.event_id().unwrap(), event_id1);
2677        });
2678
2679        assert!(stream.is_empty());
2680
2681        // A generic update is triggered too.
2682        assert_matches!(
2683            generic_stream.recv().await,
2684            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2685                assert_eq!(expected_room_id, room_id);
2686            }
2687        );
2688
2689        // A new update with one of these events leads to deduplication.
2690        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2691
2692        room_event_cache
2693            .inner
2694            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2695            .await
2696            .unwrap();
2697
2698        // Just checking the generic update is correct. There is a duplicate event, so
2699        // no generic changes whatsoever!
2700        assert!(generic_stream.recv().now_or_never().is_none());
2701
2702        // The stream doesn't report these changes *yet*. Use the items vector given
2703        // when subscribing, to check that the items correspond to their new
2704        // positions. The duplicated item is removed (so it's not the first
2705        // element anymore), and it's added to the back of the list.
2706        let items = room_event_cache.events().await;
2707        assert_eq!(items.len(), 2);
2708        assert_eq!(items[0].event_id().unwrap(), event_id1);
2709        assert_eq!(items[1].event_id().unwrap(), event_id2);
2710    }
2711
2712    #[async_test]
2713    async fn test_load_from_storage_resilient_to_failure() {
2714        let room_id = room_id!("!fondue:patate.ch");
2715        let event_cache_store = Arc::new(MemoryStore::new());
2716
2717        let event = EventFactory::new()
2718            .room(room_id)
2719            .sender(user_id!("@ben:saucisse.bzh"))
2720            .text_msg("foo")
2721            .event_id(event_id!("$42"))
2722            .into_event();
2723
2724        // Prefill the store with invalid data: two chunks that form a cycle.
2725        event_cache_store
2726            .handle_linked_chunk_updates(
2727                LinkedChunkId::Room(room_id),
2728                vec![
2729                    Update::NewItemsChunk {
2730                        previous: None,
2731                        new: ChunkIdentifier::new(0),
2732                        next: None,
2733                    },
2734                    Update::PushItems {
2735                        at: Position::new(ChunkIdentifier::new(0), 0),
2736                        items: vec![event],
2737                    },
2738                    Update::NewItemsChunk {
2739                        previous: Some(ChunkIdentifier::new(0)),
2740                        new: ChunkIdentifier::new(1),
2741                        next: Some(ChunkIdentifier::new(0)),
2742                    },
2743                ],
2744            )
2745            .await
2746            .unwrap();
2747
2748        let client = MockClientBuilder::new(None)
2749            .on_builder(|builder| {
2750                builder.store_config(
2751                    StoreConfig::new("holder".to_owned())
2752                        .event_cache_store(event_cache_store.clone()),
2753                )
2754            })
2755            .build()
2756            .await;
2757
2758        let event_cache = client.event_cache();
2759
2760        // Don't forget to subscribe and like^W enable storage!
2761        event_cache.subscribe().unwrap();
2762
2763        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2764        let room = client.get_room(room_id).unwrap();
2765
2766        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2767
2768        let items = room_event_cache.events().await;
2769
2770        // Because the persisted content was invalid, the room store is reset: there are
2771        // no events in the cache.
2772        assert!(items.is_empty());
2773
2774        // Storage doesn't contain anything. It would also be valid that it contains a
2775        // single initial empty items chunk.
2776        let raw_chunks =
2777            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
2778        assert!(raw_chunks.is_empty());
2779    }
2780
2781    #[async_test]
2782    async fn test_no_useless_gaps() {
2783        let room_id = room_id!("!galette:saucisse.bzh");
2784
2785        let client = MockClientBuilder::new(None).build().await;
2786
2787        let event_cache = client.event_cache();
2788        event_cache.subscribe().unwrap();
2789
2790        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2791        let room = client.get_room(room_id).unwrap();
2792        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2793        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2794
2795        let f = EventFactory::new().room(room_id).sender(*ALICE);
2796
2797        // Propagate an update including a limited timeline with one message and a
2798        // prev-batch token.
2799        room_event_cache
2800            .inner
2801            .handle_joined_room_update(JoinedRoomUpdate {
2802                timeline: Timeline {
2803                    limited: true,
2804                    prev_batch: Some("raclette".to_owned()),
2805                    events: vec![f.text_msg("hey yo").into_event()],
2806                },
2807                ..Default::default()
2808            })
2809            .await
2810            .unwrap();
2811
2812        // Just checking the generic update is correct.
2813        assert_matches!(
2814            generic_stream.recv().await,
2815            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2816                assert_eq!(expected_room_id, room_id);
2817            }
2818        );
2819
2820        {
2821            let mut state = room_event_cache.inner.state.write().await;
2822
2823            let mut num_gaps = 0;
2824            let mut num_events = 0;
2825
2826            for c in state.room_linked_chunk().chunks() {
2827                match c.content() {
2828                    ChunkContent::Items(items) => num_events += items.len(),
2829                    ChunkContent::Gap(_) => num_gaps += 1,
2830                }
2831            }
2832
2833            // The limited sync unloads the chunk, so it will appear as if there are only
2834            // the events.
2835            assert_eq!(num_gaps, 0);
2836            assert_eq!(num_events, 1);
2837
2838            // But if I manually reload more of the chunk, the gap will be present.
2839            assert_matches!(
2840                state.load_more_events_backwards().await.unwrap(),
2841                LoadMoreEventsBackwardsOutcome::Gap { .. }
2842            );
2843
2844            num_gaps = 0;
2845            num_events = 0;
2846            for c in state.room_linked_chunk().chunks() {
2847                match c.content() {
2848                    ChunkContent::Items(items) => num_events += items.len(),
2849                    ChunkContent::Gap(_) => num_gaps += 1,
2850                }
2851            }
2852
2853            // The gap must have been stored.
2854            assert_eq!(num_gaps, 1);
2855            assert_eq!(num_events, 1);
2856        }
2857
2858        // Now, propagate an update for another message, but the timeline isn't limited
2859        // this time.
2860        room_event_cache
2861            .inner
2862            .handle_joined_room_update(JoinedRoomUpdate {
2863                timeline: Timeline {
2864                    limited: false,
2865                    prev_batch: Some("fondue".to_owned()),
2866                    events: vec![f.text_msg("sup").into_event()],
2867                },
2868                ..Default::default()
2869            })
2870            .await
2871            .unwrap();
2872
2873        // Just checking the generic update is correct.
2874        assert_matches!(
2875            generic_stream.recv().await,
2876            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2877                assert_eq!(expected_room_id, room_id);
2878            }
2879        );
2880
2881        {
2882            let state = room_event_cache.inner.state.read().await;
2883
2884            let mut num_gaps = 0;
2885            let mut num_events = 0;
2886
2887            for c in state.room_linked_chunk().chunks() {
2888                match c.content() {
2889                    ChunkContent::Items(items) => num_events += items.len(),
2890                    ChunkContent::Gap(gap) => {
2891                        assert_eq!(gap.prev_token, "raclette");
2892                        num_gaps += 1;
2893                    }
2894                }
2895            }
2896
2897            // There's only the previous gap, no new ones.
2898            assert_eq!(num_gaps, 1);
2899            assert_eq!(num_events, 2);
2900        }
2901    }
2902
2903    #[async_test]
2904    async fn test_shrink_to_last_chunk() {
2905        let room_id = room_id!("!galette:saucisse.bzh");
2906
2907        let client = MockClientBuilder::new(None).build().await;
2908
2909        let f = EventFactory::new().room(room_id);
2910
2911        let evid1 = event_id!("$1");
2912        let evid2 = event_id!("$2");
2913
2914        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2915        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2916
2917        // Fill the event cache store with an initial linked chunk with 2 events chunks.
2918        {
2919            let store = client.event_cache_store();
2920            let store = store.lock().await.unwrap();
2921            store
2922                .handle_linked_chunk_updates(
2923                    LinkedChunkId::Room(room_id),
2924                    vec![
2925                        Update::NewItemsChunk {
2926                            previous: None,
2927                            new: ChunkIdentifier::new(0),
2928                            next: None,
2929                        },
2930                        Update::PushItems {
2931                            at: Position::new(ChunkIdentifier::new(0), 0),
2932                            items: vec![ev1],
2933                        },
2934                        Update::NewItemsChunk {
2935                            previous: Some(ChunkIdentifier::new(0)),
2936                            new: ChunkIdentifier::new(1),
2937                            next: None,
2938                        },
2939                        Update::PushItems {
2940                            at: Position::new(ChunkIdentifier::new(1), 0),
2941                            items: vec![ev2],
2942                        },
2943                    ],
2944                )
2945                .await
2946                .unwrap();
2947        }
2948
2949        let event_cache = client.event_cache();
2950        event_cache.subscribe().unwrap();
2951
2952        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2953        let room = client.get_room(room_id).unwrap();
2954        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2955
2956        // Sanity check: lazily loaded, so only includes one item at start.
2957        let (events, mut stream) = room_event_cache.subscribe().await;
2958        assert_eq!(events.len(), 1);
2959        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2960        assert!(stream.is_empty());
2961
2962        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2963
2964        // Force loading the full linked chunk by back-paginating.
2965        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2966        assert_eq!(outcome.events.len(), 1);
2967        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2968        assert!(outcome.reached_start);
2969
2970        // We also get an update about the loading from the store.
2971        assert_let_timeout!(
2972            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2973        );
2974        assert_eq!(diffs.len(), 1);
2975        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2976            assert_eq!(value.event_id().as_deref(), Some(evid1));
2977        });
2978
2979        assert!(stream.is_empty());
2980
2981        // Same for the generic update.
2982        assert_let_timeout!(
2983            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2984        );
2985        assert_eq!(received_room_id, room_id);
2986
2987        // Shrink the linked chunk to the last chunk.
2988        let diffs = room_event_cache
2989            .inner
2990            .state
2991            .write()
2992            .await
2993            .force_shrink_to_last_chunk()
2994            .await
2995            .expect("shrinking should succeed");
2996
2997        // We receive updates about the changes to the linked chunk.
2998        assert_eq!(diffs.len(), 2);
2999        assert_matches!(&diffs[0], VectorDiff::Clear);
3000        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3001            assert_eq!(values.len(), 1);
3002            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3003        });
3004
3005        assert!(stream.is_empty());
3006
3007        // No generic update is sent in this case.
3008        assert!(generic_stream.is_empty());
3009
3010        // When reading the events, we do get only the last one.
3011        let events = room_event_cache.events().await;
3012        assert_eq!(events.len(), 1);
3013        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3014
3015        // But if we back-paginate, we don't need access to network to find out about
3016        // the previous event.
3017        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3018        assert_eq!(outcome.events.len(), 1);
3019        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3020        assert!(outcome.reached_start);
3021    }
3022
3023    #[async_test]
3024    async fn test_room_ordering() {
3025        let room_id = room_id!("!galette:saucisse.bzh");
3026
3027        let client = MockClientBuilder::new(None).build().await;
3028
3029        let f = EventFactory::new().room(room_id).sender(*ALICE);
3030
3031        let evid1 = event_id!("$1");
3032        let evid2 = event_id!("$2");
3033        let evid3 = event_id!("$3");
3034
3035        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3036        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3037        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3038
3039        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3040        {
3041            let store = client.event_cache_store();
3042            let store = store.lock().await.unwrap();
3043            store
3044                .handle_linked_chunk_updates(
3045                    LinkedChunkId::Room(room_id),
3046                    vec![
3047                        Update::NewItemsChunk {
3048                            previous: None,
3049                            new: ChunkIdentifier::new(0),
3050                            next: None,
3051                        },
3052                        Update::PushItems {
3053                            at: Position::new(ChunkIdentifier::new(0), 0),
3054                            items: vec![ev1, ev2],
3055                        },
3056                        Update::NewItemsChunk {
3057                            previous: Some(ChunkIdentifier::new(0)),
3058                            new: ChunkIdentifier::new(1),
3059                            next: None,
3060                        },
3061                        Update::PushItems {
3062                            at: Position::new(ChunkIdentifier::new(1), 0),
3063                            items: vec![ev3.clone()],
3064                        },
3065                    ],
3066                )
3067                .await
3068                .unwrap();
3069        }
3070
3071        let event_cache = client.event_cache();
3072        event_cache.subscribe().unwrap();
3073
3074        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3075        let room = client.get_room(room_id).unwrap();
3076        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3077
3078        // Initially, the linked chunk only contains the last chunk, so only ev3 is
3079        // loaded.
3080        {
3081            let state = room_event_cache.inner.state.read().await;
3082
3083            // But we can get the order of ev1.
3084            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3085
3086            // And that of ev2 as well.
3087            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3088
3089            // ev3, which is loaded, also has a known ordering.
3090            let mut events = state.room_linked_chunk().events();
3091            let (pos, ev) = events.next().unwrap();
3092            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3093            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3094            assert_eq!(state.room_event_order(pos), Some(2));
3095
3096            // No other loaded events.
3097            assert!(events.next().is_none());
3098        }
3099
3100        // Force loading the full linked chunk by back-paginating.
3101        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3102        assert!(outcome.reached_start);
3103
3104        // All events are now loaded, so their order is precisely their enumerated index
3105        // in a linear iteration.
3106        {
3107            let state = room_event_cache.inner.state.read().await;
3108            for (i, (pos, _)) in state.room_linked_chunk().events().enumerate() {
3109                assert_eq!(state.room_event_order(pos), Some(i));
3110            }
3111        }
3112
3113        // Handle a gappy sync with two events (including one duplicate, so
3114        // deduplication kicks in), so that the linked chunk is shrunk to the
3115        // last chunk, and that the linked chunk only contains the last two
3116        // events.
3117        let evid4 = event_id!("$4");
3118        room_event_cache
3119            .inner
3120            .handle_joined_room_update(JoinedRoomUpdate {
3121                timeline: Timeline {
3122                    limited: true,
3123                    prev_batch: Some("fondue".to_owned()),
3124                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3125                },
3126                ..Default::default()
3127            })
3128            .await
3129            .unwrap();
3130
3131        {
3132            let state = room_event_cache.inner.state.read().await;
3133
3134            // After the shrink, only evid3 and evid4 are loaded.
3135            let mut events = state.room_linked_chunk().events();
3136
3137            let (pos, ev) = events.next().unwrap();
3138            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3139            assert_eq!(state.room_event_order(pos), Some(2));
3140
3141            let (pos, ev) = events.next().unwrap();
3142            assert_eq!(ev.event_id().as_deref(), Some(evid4));
3143            assert_eq!(state.room_event_order(pos), Some(3));
3144
3145            // No other loaded events.
3146            assert!(events.next().is_none());
3147
3148            // But we can still get the order of previous events.
3149            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3150            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3151
3152            // ev3 doesn't have an order with its previous position, since it's been
3153            // deduplicated.
3154            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(1), 0)), None);
3155        }
3156    }
3157
3158    #[async_test]
3159    async fn test_auto_shrink_after_all_subscribers_are_gone() {
3160        let room_id = room_id!("!galette:saucisse.bzh");
3161
3162        let client = MockClientBuilder::new(None).build().await;
3163
3164        let f = EventFactory::new().room(room_id);
3165
3166        let evid1 = event_id!("$1");
3167        let evid2 = event_id!("$2");
3168
3169        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3170        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3171
3172        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3173        {
3174            let store = client.event_cache_store();
3175            let store = store.lock().await.unwrap();
3176            store
3177                .handle_linked_chunk_updates(
3178                    LinkedChunkId::Room(room_id),
3179                    vec![
3180                        Update::NewItemsChunk {
3181                            previous: None,
3182                            new: ChunkIdentifier::new(0),
3183                            next: None,
3184                        },
3185                        Update::PushItems {
3186                            at: Position::new(ChunkIdentifier::new(0), 0),
3187                            items: vec![ev1],
3188                        },
3189                        Update::NewItemsChunk {
3190                            previous: Some(ChunkIdentifier::new(0)),
3191                            new: ChunkIdentifier::new(1),
3192                            next: None,
3193                        },
3194                        Update::PushItems {
3195                            at: Position::new(ChunkIdentifier::new(1), 0),
3196                            items: vec![ev2],
3197                        },
3198                    ],
3199                )
3200                .await
3201                .unwrap();
3202        }
3203
3204        let event_cache = client.event_cache();
3205        event_cache.subscribe().unwrap();
3206
3207        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3208        let room = client.get_room(room_id).unwrap();
3209        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3210
3211        // Sanity check: lazily loaded, so only includes one item at start.
3212        let (events1, mut stream1) = room_event_cache.subscribe().await;
3213        assert_eq!(events1.len(), 1);
3214        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3215        assert!(stream1.is_empty());
3216
3217        // Force loading the full linked chunk by back-paginating.
3218        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3219        assert_eq!(outcome.events.len(), 1);
3220        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3221        assert!(outcome.reached_start);
3222
3223        // We also get an update about the loading from the store. Ignore it, for this
3224        // test's sake.
3225        assert_let_timeout!(
3226            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3227        );
3228        assert_eq!(diffs.len(), 1);
3229        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3230            assert_eq!(value.event_id().as_deref(), Some(evid1));
3231        });
3232
3233        assert!(stream1.is_empty());
3234
3235        // Have another subscriber.
3236        // Since it's not the first one, and the previous one loaded some more events,
3237        // the second subscribers sees them all.
3238        let (events2, stream2) = room_event_cache.subscribe().await;
3239        assert_eq!(events2.len(), 2);
3240        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3241        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3242        assert!(stream2.is_empty());
3243
3244        // Drop the first stream, and wait a bit.
3245        drop(stream1);
3246        yield_now().await;
3247
3248        // The second stream remains undisturbed.
3249        assert!(stream2.is_empty());
3250
3251        // Now drop the second stream, and wait a bit.
3252        drop(stream2);
3253        yield_now().await;
3254
3255        // The linked chunk must have auto-shrunk by now.
3256
3257        {
3258            // Check the inner state: there's no more shared auto-shrinker.
3259            let state = room_event_cache.inner.state.read().await;
3260            assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0);
3261        }
3262
3263        // Getting the events will only give us the latest chunk.
3264        let events3 = room_event_cache.events().await;
3265        assert_eq!(events3.len(), 1);
3266        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3267    }
3268
3269    #[async_test]
3270    async fn test_rfind_map_event_in_memory_by() {
3271        let user_id = user_id!("@mnt_io:matrix.org");
3272        let room_id = room_id!("!raclette:patate.ch");
3273        let client = MockClientBuilder::new(None).build().await;
3274
3275        let event_factory = EventFactory::new().room(room_id);
3276
3277        let event_id_0 = event_id!("$ev0");
3278        let event_id_1 = event_id!("$ev1");
3279        let event_id_2 = event_id!("$ev2");
3280        let event_id_3 = event_id!("$ev3");
3281
3282        let event_0 =
3283            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3284        let event_1 =
3285            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3286        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3287        let event_3 =
3288            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3289
3290        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
3291        // events.
3292        {
3293            let store = client.event_cache_store();
3294            let store = store.lock().await.unwrap();
3295            store
3296                .handle_linked_chunk_updates(
3297                    LinkedChunkId::Room(room_id),
3298                    vec![
3299                        Update::NewItemsChunk {
3300                            previous: None,
3301                            new: ChunkIdentifier::new(0),
3302                            next: None,
3303                        },
3304                        Update::PushItems {
3305                            at: Position::new(ChunkIdentifier::new(0), 0),
3306                            items: vec![event_3],
3307                        },
3308                        Update::NewItemsChunk {
3309                            previous: Some(ChunkIdentifier::new(0)),
3310                            new: ChunkIdentifier::new(1),
3311                            next: None,
3312                        },
3313                        Update::PushItems {
3314                            at: Position::new(ChunkIdentifier::new(1), 0),
3315                            items: vec![event_0, event_1, event_2],
3316                        },
3317                    ],
3318                )
3319                .await
3320                .unwrap();
3321        }
3322
3323        let event_cache = client.event_cache();
3324        event_cache.subscribe().unwrap();
3325
3326        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3327        let room = client.get_room(room_id).unwrap();
3328        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3329
3330        // Look for an event from `BOB`: it must be `event_0`.
3331        assert_matches!(
3332            room_event_cache
3333                .rfind_map_event_in_memory_by(|event| {
3334                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
3335                })
3336                .await,
3337            Some(event_id) => {
3338                assert_eq!(event_id.as_deref(), Some(event_id_0));
3339            }
3340        );
3341
3342        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
3343        // because events are looked for in reverse order.
3344        assert_matches!(
3345            room_event_cache
3346                .rfind_map_event_in_memory_by(|event| {
3347                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
3348                })
3349                .await,
3350            Some(event_id) => {
3351                assert_eq!(event_id.as_deref(), Some(event_id_2));
3352            }
3353        );
3354
3355        // Look for an event that is inside the storage, but not loaded.
3356        assert!(
3357            room_event_cache
3358                .rfind_map_event_in_memory_by(|event| {
3359                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3360                        == Some(user_id))
3361                    .then(|| event.event_id())
3362                })
3363                .await
3364                .is_none()
3365        );
3366
3367        // Look for an event that doesn't exist.
3368        assert!(room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.is_none());
3369    }
3370}