matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        Arc,
23        atomic::{AtomicUsize, Ordering},
24    },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::AmbiguityChange,
32    event_cache::Event,
33    linked_chunk::Position,
34    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37    EventId, OwnedEventId, OwnedRoomId, RoomId,
38    api::Direction,
39    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40    serde::Raw,
41};
42use tokio::sync::{
43    Notify,
44    broadcast::{Receiver, Sender},
45    mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50    AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51    RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54    client::WeakClient,
55    event_cache::EventCacheError,
56    room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64/// A subset of an event cache, for a room.
65///
66/// Cloning is shallow, and thus is cheap to do.
67#[derive(Clone)]
68pub struct RoomEventCache {
69    pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_struct("RoomEventCache").finish_non_exhaustive()
75    }
76}
77
78/// Thin wrapper for a room event cache subscriber, so as to trigger
79/// side-effects when all subscribers are gone.
80///
81/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
82/// more subscribers are active. This is an optimisation to reduce the number of
83/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
84/// active, all data are reduced to the minimum.
85///
86/// The side-effect takes effect on `Drop`.
87#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89    /// Underlying receiver of the room event cache's updates.
90    recv: Receiver<RoomEventCacheUpdate>,
91
92    /// To which room are we listening?
93    room_id: OwnedRoomId,
94
95    /// Sender to the auto-shrink channel.
96    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98    /// Shared instance of the auto-shrinker.
99    subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103    fn drop(&mut self) {
104        let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106        trace!(
107            "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108        );
109
110        if previous_subscriber_count == 1 {
111            // We were the last instance of the subscriber; let the auto-shrinker know by
112            // notifying it of our room id.
113
114            let mut room_id = self.room_id.clone();
115
116            // Try to send without waiting for channel capacity, and restart in a spin-loop
117            // if it failed (until a maximum number of attempts is reached, or
118            // the send was successful). The channel shouldn't be super busy in
119            // general, so this should resolve quickly enough.
120
121            let mut num_attempts = 0;
122
123            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124                num_attempts += 1;
125
126                if num_attempts > 1024 {
127                    // If we've tried too many times, just give up with a warning; after all, this
128                    // is only an optimization.
129                    warn!(
130                        "couldn't send notification to the auto-shrink channel \
131                         after 1024 attempts; giving up"
132                    );
133                    return;
134                }
135
136                match err {
137                    mpsc::error::TrySendError::Full(stolen_room_id) => {
138                        room_id = stolen_room_id;
139                    }
140                    mpsc::error::TrySendError::Closed(_) => return,
141                }
142            }
143
144            trace!("sent notification to the parent channel that we were the last subscriber");
145        }
146    }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150    type Target = Receiver<RoomEventCacheUpdate>;
151
152    fn deref(&self) -> &Self::Target {
153        &self.recv
154    }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158    fn deref_mut(&mut self) -> &mut Self::Target {
159        &mut self.recv
160    }
161}
162
163impl RoomEventCache {
164    /// Create a new [`RoomEventCache`] using the given room and store.
165    pub(super) fn new(
166        client: WeakClient,
167        state: RoomEventCacheStateLock,
168        pagination_status: SharedObservable<RoomPaginationStatus>,
169        room_id: OwnedRoomId,
170        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171        update_sender: Sender<RoomEventCacheUpdate>,
172        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
173    ) -> Self {
174        Self {
175            inner: Arc::new(RoomEventCacheInner::new(
176                client,
177                state,
178                pagination_status,
179                room_id,
180                auto_shrink_sender,
181                update_sender,
182                generic_update_sender,
183            )),
184        }
185    }
186
187    /// Get the room ID for this [`RoomEventCache`].
188    pub fn room_id(&self) -> &RoomId {
189        &self.inner.room_id
190    }
191
192    /// Read all current events.
193    ///
194    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
195    /// subscriber.
196    pub async fn events(&self) -> Result<Vec<Event>> {
197        let state = self.inner.state.read().await?;
198
199        Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
200    }
201
202    /// Subscribe to this room updates, after getting the initial list of
203    /// events.
204    ///
205    /// Use [`RoomEventCache::events`] to get all current events without the
206    /// subscriber. Creating, and especially dropping, a
207    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
208    pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
209        let state = self.inner.state.read().await?;
210        let events =
211            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
212
213        let subscriber_count = state.subscriber_count();
214        let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
215        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
216
217        let recv = self.inner.update_sender.subscribe();
218        let subscriber = RoomEventCacheSubscriber {
219            recv,
220            room_id: self.inner.room_id.clone(),
221            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
222            subscriber_count: subscriber_count.clone(),
223        };
224
225        Ok((events, subscriber))
226    }
227
228    /// Subscribe to thread for a given root event, and get a (maybe empty)
229    /// initially known list of events for that thread.
230    pub async fn subscribe_to_thread(
231        &self,
232        thread_root: OwnedEventId,
233    ) -> Result<(Vec<Event>, Receiver<ThreadEventCacheUpdate>)> {
234        let mut state = self.inner.state.write().await?;
235        Ok(state.subscribe_to_thread(thread_root))
236    }
237
238    /// Paginate backwards in a thread, given its root event ID.
239    ///
240    /// Returns whether we've hit the start of the thread, in which case the
241    /// root event will be prepended to the thread.
242    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
243    pub async fn paginate_thread_backwards(
244        &self,
245        thread_root: OwnedEventId,
246        num_events: u16,
247    ) -> Result<bool> {
248        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
249
250        // Take the lock only for a short time here.
251        let mut outcome =
252            self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone());
253
254        loop {
255            match outcome {
256                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
257                    // Start a threaded pagination from this gap.
258                    let options = RelationsOptions {
259                        from: prev_token.clone(),
260                        dir: Direction::Backward,
261                        limit: Some(num_events.into()),
262                        include_relations: IncludeRelations::AllRelations,
263                        recurse: true,
264                    };
265
266                    let mut result = room
267                        .relations(thread_root.clone(), options)
268                        .await
269                        .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
270
271                    let reached_start = result.next_batch_token.is_none();
272                    trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
273
274                    // Because the state lock is taken again in `load_or_fetch_event`, we need
275                    // to do this *before* we take the state lock again.
276                    let root_event =
277                        if reached_start {
278                            // Prepend the thread root event to the results.
279                            Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
280                                |err| EventCacheError::BackpaginationError(Box::new(err)),
281                            )?)
282                        } else {
283                            None
284                        };
285
286                    let mut state = self.inner.state.write().await?;
287
288                    // Save all the events (but the thread root) in the store.
289                    state.save_events(result.chunk.iter().cloned()).await?;
290
291                    // Note: the events are still in the reversed order at this point, so
292                    // pushing will eventually make it so that the root event is the first.
293                    result.chunk.extend(root_event);
294
295                    if let Some(outcome) = state.finish_thread_network_pagination(
296                        thread_root.clone(),
297                        prev_token,
298                        result.next_batch_token,
299                        result.chunk,
300                    ) {
301                        return Ok(outcome.reached_start);
302                    }
303
304                    // fallthrough: restart the pagination.
305                    outcome = state.load_more_thread_events_backwards(thread_root.clone());
306                }
307
308                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
309                    // We're done!
310                    return Ok(true);
311                }
312
313                LoadMoreEventsBackwardsOutcome::Events { .. } => {
314                    // TODO: implement :)
315                    unimplemented!("loading from disk for threads is not implemented yet");
316                }
317            }
318        }
319    }
320
321    /// Return a [`RoomPagination`] API object useful for running
322    /// back-pagination queries in the current room.
323    pub fn pagination(&self) -> RoomPagination {
324        RoomPagination { inner: self.inner.clone() }
325    }
326
327    /// Try to find a single event in this room, starting from the most recent
328    /// event.
329    ///
330    /// The `predicate` receives two arguments: the current event, and the
331    /// ID of the _previous_ (older) event.
332    ///
333    /// **Warning**! It looks into the loaded events from the in-memory linked
334    /// chunk **only**. It doesn't look inside the storage.
335    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
336    where
337        P: FnMut(&Event, Option<&Event>) -> Option<O>,
338    {
339        Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
340    }
341
342    /// Try to find an event by ID in this room.
343    ///
344    /// It starts by looking into loaded events before looking inside the
345    /// storage.
346    pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
347        Ok(self
348            .inner
349            .state
350            .read()
351            .await?
352            .find_event(event_id)
353            .await
354            .ok()
355            .flatten()
356            .map(|(_loc, event)| event))
357    }
358
359    /// Try to find an event by ID in this room, along with its related events.
360    ///
361    /// You can filter which types of related events to retrieve using
362    /// `filter`. `None` will retrieve related events of any type.
363    ///
364    /// The related events are sorted like this:
365    ///
366    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
367    ///   located at the beginning of the array.
368    /// - events present in the linked chunk (be it in memory or in the storage)
369    ///   will be sorted according to their ordering in the linked chunk.
370    pub async fn find_event_with_relations(
371        &self,
372        event_id: &EventId,
373        filter: Option<Vec<RelationType>>,
374    ) -> Result<Option<(Event, Vec<Event>)>> {
375        // Search in all loaded or stored events.
376        Ok(self
377            .inner
378            .state
379            .read()
380            .await?
381            .find_event_with_relations(event_id, filter.clone())
382            .await
383            .ok()
384            .flatten())
385    }
386
387    /// Try to find the related events for an event by ID in this room.
388    ///
389    /// You can filter which types of related events to retrieve using
390    /// `filter`. `None` will retrieve related events of any type.
391    ///
392    /// The related events are sorted like this:
393    ///
394    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
395    ///   located at the beginning of the array.
396    /// - events present in the linked chunk (be it in memory or in the storage)
397    ///   will be sorted according to their ordering in the linked chunk.
398    pub async fn find_event_relations(
399        &self,
400        event_id: &EventId,
401        filter: Option<Vec<RelationType>>,
402    ) -> Result<Vec<Event>> {
403        // Search in all loaded or stored events.
404        self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
405    }
406
407    /// Clear all the storage for this [`RoomEventCache`].
408    ///
409    /// This will get rid of all the events from the linked chunk and persisted
410    /// storage.
411    pub async fn clear(&self) -> Result<()> {
412        // Clear the linked chunk and persisted storage.
413        let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
414
415        // Notify observers about the update.
416        let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
417            diffs: updates_as_vector_diffs,
418            origin: EventsOrigin::Cache,
419        });
420
421        // Notify observers about the generic update.
422        let _ = self
423            .inner
424            .generic_update_sender
425            .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
426
427        Ok(())
428    }
429
430    /// Handle a single event from the `SendQueue`.
431    pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
432        self.inner
433            .handle_timeline(
434                Timeline { limited: false, prev_batch: None, events: vec![event] },
435                Vec::new(),
436                BTreeMap::new(),
437            )
438            .await
439    }
440
441    /// Save some events in the event cache, for further retrieval with
442    /// [`Self::event`].
443    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
444        match self.inner.state.write().await {
445            Ok(mut state_guard) => {
446                if let Err(err) = state_guard.save_events(events).await {
447                    warn!("couldn't save event in the event cache: {err}");
448                }
449            }
450
451            Err(err) => {
452                warn!("couldn't save event in the event cache: {err}");
453            }
454        }
455    }
456
457    /// Return a nice debug string (a vector of lines) for the linked chunk of
458    /// events for this room.
459    pub async fn debug_string(&self) -> Vec<String> {
460        match self.inner.state.read().await {
461            Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
462            Err(err) => {
463                warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
464
465                vec![]
466            }
467        }
468    }
469}
470
471/// The (non-cloneable) details of the `RoomEventCache`.
472pub(super) struct RoomEventCacheInner {
473    /// The room id for this room.
474    pub(super) room_id: OwnedRoomId,
475
476    pub weak_room: WeakRoom,
477
478    /// State for this room's event cache.
479    pub state: RoomEventCacheStateLock,
480
481    /// A notifier that we received a new pagination token.
482    pub pagination_batch_token_notifier: Notify,
483
484    pub pagination_status: SharedObservable<RoomPaginationStatus>,
485
486    /// Sender to the auto-shrink channel.
487    ///
488    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
489    /// more details.
490    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
491
492    /// Sender part for update subscribers to this room.
493    pub update_sender: Sender<RoomEventCacheUpdate>,
494
495    /// A clone of [`EventCacheInner::generic_update_sender`].
496    ///
497    /// Whilst `EventCacheInner` handles the generic updates from the sync, or
498    /// the storage, it doesn't handle the update from pagination. Having a
499    /// clone here allows to access it from [`RoomPagination`].
500    pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
501}
502
503impl RoomEventCacheInner {
504    /// Creates a new cache for a room, and subscribes to room updates, so as
505    /// to handle new timeline events.
506    fn new(
507        client: WeakClient,
508        state: RoomEventCacheStateLock,
509        pagination_status: SharedObservable<RoomPaginationStatus>,
510        room_id: OwnedRoomId,
511        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
512        update_sender: Sender<RoomEventCacheUpdate>,
513        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
514    ) -> Self {
515        let weak_room = WeakRoom::new(client, room_id);
516
517        Self {
518            room_id: weak_room.room_id().to_owned(),
519            weak_room,
520            state,
521            update_sender,
522            pagination_batch_token_notifier: Default::default(),
523            auto_shrink_sender,
524            pagination_status,
525            generic_update_sender,
526        }
527    }
528
529    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
530        if account_data.is_empty() {
531            return;
532        }
533
534        let mut handled_read_marker = false;
535
536        trace!("Handling account data");
537
538        for raw_event in account_data {
539            match raw_event.deserialize() {
540                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
541                    // If duplicated, do not forward read marker multiple times
542                    // to avoid clutter the update channel.
543                    if handled_read_marker {
544                        continue;
545                    }
546
547                    handled_read_marker = true;
548
549                    // Propagate to observers. (We ignore the error if there aren't any.)
550                    let _ = self.update_sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
551                        event_id: ev.content.event_id,
552                    });
553                }
554
555                Ok(_) => {
556                    // We're not interested in other room account data updates,
557                    // at this point.
558                }
559
560                Err(e) => {
561                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
562                    warn!(event_type, "Failed to deserialize account data: {e}");
563                }
564            }
565        }
566    }
567
568    #[instrument(skip_all, fields(room_id = %self.room_id))]
569    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
570        self.handle_timeline(
571            updates.timeline,
572            updates.ephemeral.clone(),
573            updates.ambiguity_changes,
574        )
575        .await?;
576        self.handle_account_data(updates.account_data);
577
578        Ok(())
579    }
580
581    #[instrument(skip_all, fields(room_id = %self.room_id))]
582    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
583        self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
584
585        Ok(())
586    }
587
588    /// Handle a [`Timeline`], i.e. new events received by a sync for this
589    /// room.
590    async fn handle_timeline(
591        &self,
592        timeline: Timeline,
593        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
594        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
595    ) -> Result<()> {
596        if timeline.events.is_empty()
597            && timeline.prev_batch.is_none()
598            && ephemeral_events.is_empty()
599            && ambiguity_changes.is_empty()
600        {
601            return Ok(());
602        }
603
604        // Add all the events to the backend.
605        trace!("adding new events");
606
607        let (stored_prev_batch_token, timeline_event_diffs) =
608            self.state.write().await?.handle_sync(timeline).await?;
609
610        // Now that all events have been added, we can trigger the
611        // `pagination_token_notifier`.
612        if stored_prev_batch_token {
613            self.pagination_batch_token_notifier.notify_one();
614        }
615
616        // The order matters here: first send the timeline event diffs, then only the
617        // related events (read receipts, etc.).
618        if !timeline_event_diffs.is_empty() {
619            let _ = self.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
620                diffs: timeline_event_diffs,
621                origin: EventsOrigin::Sync,
622            });
623
624            let _ = self
625                .generic_update_sender
626                .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
627        }
628
629        if !ephemeral_events.is_empty() {
630            let _ = self
631                .update_sender
632                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
633        }
634
635        if !ambiguity_changes.is_empty() {
636            let _ =
637                self.update_sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
638        }
639
640        Ok(())
641    }
642}
643
644/// Internal type to represent the output of
645/// [`RoomEventCacheState::load_more_events_backwards`].
646#[derive(Debug)]
647pub(super) enum LoadMoreEventsBackwardsOutcome {
648    /// A gap has been inserted.
649    Gap {
650        /// The previous batch token to be used as the "end" parameter in the
651        /// back-pagination request.
652        prev_token: Option<String>,
653    },
654
655    /// The start of the timeline has been reached.
656    StartOfTimeline,
657
658    /// Events have been inserted.
659    Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
660}
661
662// Use a private module to hide `events` to this parent module.
663mod private {
664    use std::{
665        collections::{BTreeMap, HashMap, HashSet},
666        sync::{
667            Arc,
668            atomic::{AtomicBool, AtomicUsize, Ordering},
669        },
670    };
671
672    use eyeball::SharedObservable;
673    use eyeball_im::VectorDiff;
674    use itertools::Itertools;
675    use matrix_sdk_base::{
676        apply_redaction,
677        deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
678        event_cache::{
679            Event, Gap,
680            store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
681        },
682        linked_chunk::{
683            ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
684            OwnedLinkedChunkId, Position, Update, lazy_loader,
685        },
686        serde_helpers::{extract_edit_target, extract_thread_root},
687        sync::Timeline,
688    };
689    use matrix_sdk_common::executor::spawn;
690    use ruma::{
691        EventId, OwnedEventId, OwnedRoomId, RoomId,
692        events::{
693            AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
694            relation::RelationType, room::redaction::SyncRoomRedactionEvent,
695        },
696        room_version_rules::RoomVersionRules,
697        serde::Raw,
698    };
699    use tokio::sync::{
700        Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
701        broadcast::{Receiver, Sender},
702    };
703    use tracing::{debug, error, instrument, trace, warn};
704
705    use super::{
706        super::{
707            BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate,
708            RoomPaginationStatus, ThreadEventCacheUpdate,
709            deduplicator::{DeduplicationOutcome, filter_duplicate_events},
710            room::threads::ThreadEventCache,
711        },
712        EventLocation, EventsOrigin, LoadMoreEventsBackwardsOutcome, RoomEventCacheGenericUpdate,
713        RoomEventCacheUpdate,
714        events::EventLinkedChunk,
715        sort_positions_descending,
716    };
717
718    /// State for a single room's event cache.
719    ///
720    /// This contains all the inner mutable states that ought to be updated at
721    /// the same time.
722    pub struct RoomEventCacheStateLock {
723        /// The per-thread lock around the real state.
724        locked_state: RwLock<RoomEventCacheStateLockInner>,
725
726        /// Please see inline comment of [`Self::read`] to understand why it
727        /// exists.
728        read_lock_acquisition: Mutex<()>,
729    }
730
731    struct RoomEventCacheStateLockInner {
732        /// Whether thread support has been enabled for the event cache.
733        enabled_thread_support: bool,
734
735        /// The room this state relates to.
736        room_id: OwnedRoomId,
737
738        /// Reference to the underlying backing store.
739        store: EventCacheStoreLock,
740
741        /// The loaded events for the current room, that is, the in-memory
742        /// linked chunk for this room.
743        room_linked_chunk: EventLinkedChunk,
744
745        /// Threads present in this room.
746        ///
747        /// Keyed by the thread root event ID.
748        threads: HashMap<OwnedEventId, ThreadEventCache>,
749
750        pagination_status: SharedObservable<RoomPaginationStatus>,
751
752        /// A clone of [`super::RoomEventCacheInner::update_sender`].
753        ///
754        /// This is used only by the [`RoomEventCacheStateLock::read`] and
755        /// [`RoomEventCacheStateLock::write`] when the state must be reset.
756        update_sender: Sender<RoomEventCacheUpdate>,
757
758        /// A clone of [`super::super::EventCacheInner::generic_update_sender`].
759        ///
760        /// This is used only by the [`RoomEventCacheStateLock::read`] and
761        /// [`RoomEventCacheStateLock::write`] when the state must be reset.
762        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
763
764        /// A clone of
765        /// [`super::super::EventCacheInner::linked_chunk_update_sender`].
766        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
767
768        /// The rules for the version of this room.
769        room_version_rules: RoomVersionRules,
770
771        /// Have we ever waited for a previous-batch-token to come from sync, in
772        /// the context of pagination? We do this at most once per room,
773        /// the first time we try to run backward pagination. We reset
774        /// that upon clearing the timeline events.
775        waited_for_initial_prev_token: Arc<AtomicBool>,
776
777        /// An atomic count of the current number of subscriber of the
778        /// [`super::RoomEventCache`].
779        subscriber_count: Arc<AtomicUsize>,
780    }
781
782    impl RoomEventCacheStateLock {
783        /// Create a new state, or reload it from storage if it's been enabled.
784        ///
785        /// Not all events are going to be loaded. Only a portion of them. The
786        /// [`EventLinkedChunk`] relies on a [`LinkedChunk`] to store all
787        /// events. Only the last chunk will be loaded. It means the
788        /// events are loaded from the most recent to the oldest. To
789        /// load more events, see [`RoomPagination`].
790        ///
791        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
792        /// [`RoomPagination`]: super::RoomPagination
793        #[allow(clippy::too_many_arguments)]
794        pub async fn new(
795            room_id: OwnedRoomId,
796            room_version_rules: RoomVersionRules,
797            enabled_thread_support: bool,
798            update_sender: Sender<RoomEventCacheUpdate>,
799            generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
800            linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
801            store: EventCacheStoreLock,
802            pagination_status: SharedObservable<RoomPaginationStatus>,
803        ) -> Result<Self, EventCacheError> {
804            let store_guard = match store.lock().await? {
805                // Lock is clean: all good!
806                EventCacheStoreLockState::Clean(guard) => guard,
807
808                // Lock is dirty, not a problem, it's the first time we are creating this state, no
809                // need to refresh.
810                EventCacheStoreLockState::Dirty(guard) => {
811                    EventCacheStoreLockGuard::clear_dirty(&guard);
812
813                    guard
814                }
815            };
816
817            let linked_chunk_id = LinkedChunkId::Room(&room_id);
818
819            // Load the full linked chunk's metadata, so as to feed the order tracker.
820            //
821            // If loading the full linked chunk failed, we'll clear the event cache, as it
822            // indicates that at some point, there's some malformed data.
823            let full_linked_chunk_metadata =
824                match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
825                    Ok(metas) => metas,
826                    Err(err) => {
827                        error!(
828                            "error when loading a linked chunk's metadata from the store: {err}"
829                        );
830
831                        // Try to clear storage for this room.
832                        store_guard
833                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
834                            .await?;
835
836                        // Restart with an empty linked chunk.
837                        None
838                    }
839                };
840
841            let linked_chunk = match store_guard
842                .load_last_chunk(linked_chunk_id)
843                .await
844                .map_err(EventCacheError::from)
845                .and_then(|(last_chunk, chunk_identifier_generator)| {
846                    lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
847                        .map_err(EventCacheError::from)
848                }) {
849                Ok(linked_chunk) => linked_chunk,
850                Err(err) => {
851                    error!(
852                        "error when loading a linked chunk's latest chunk from the store: {err}"
853                    );
854
855                    // Try to clear storage for this room.
856                    store_guard
857                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
858                        .await?;
859
860                    None
861                }
862            };
863
864            let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false));
865
866            Ok(Self {
867                locked_state: RwLock::new(RoomEventCacheStateLockInner {
868                    enabled_thread_support,
869                    room_id,
870                    store,
871                    room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
872                        linked_chunk,
873                        full_linked_chunk_metadata,
874                    ),
875                    // The threads mapping is intentionally empty at start, since we're going to
876                    // reload threads lazily, as soon as we need to (based on external
877                    // subscribers) or when we get new information about those (from
878                    // sync).
879                    threads: HashMap::new(),
880                    pagination_status,
881                    update_sender,
882                    generic_update_sender,
883                    linked_chunk_update_sender,
884                    room_version_rules,
885                    waited_for_initial_prev_token,
886                    subscriber_count: Default::default(),
887                }),
888                read_lock_acquisition: Mutex::new(()),
889            })
890        }
891
892        /// Lock this [`RoomEventCacheStateLock`] with per-thread shared access.
893        ///
894        /// This method locks the per-thread lock over the state, and then locks
895        /// the cross-process lock over the store. It returns an RAII guard
896        /// which will drop the read access to the state and to the store when
897        /// dropped.
898        ///
899        /// If the cross-process lock over the store is dirty (see
900        /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
901        pub async fn read(&self) -> Result<RoomEventCacheStateLockReadGuard<'_>, EventCacheError> {
902            // Only one call at a time to `read` is allowed.
903            //
904            // Why? Because in case the cross-process lock over the store is dirty, we need
905            // to upgrade the read lock over the state to a write lock.
906            //
907            // ## Upgradable read lock
908            //
909            // One may argue that this upgrades can be done with an _upgradable read lock_
910            // [^1] [^2]. We don't want to use this solution: an upgradable read lock is
911            // basically a mutex because we are losing the shared access property, i.e.
912            // having multiple read locks at the same time. This is an important property to
913            // hold for performance concerns.
914            //
915            // ## Downgradable write lock
916            //
917            // One may also argue we could first obtain a write lock over the state from the
918            // beginning, thus removing the need to upgrade the read lock to a write lock.
919            // The write lock is then downgraded to a read lock once the dirty is cleaned
920            // up. It can potentially create a deadlock in the following situation:
921            //
922            // - `read` is called once, it takes a write lock, then downgrades it to a read
923            //   lock: the guard is kept alive somewhere,
924            // - `read` is called again, and waits to obtain the write lock, which is
925            //   impossible as long as the guard from the previous call is not dropped.
926            //
927            // ## “Atomic” read and write
928            //
929            // One may finally argue to first obtain a read lock over the state, then drop
930            // it if the cross-process lock over the store is dirty, and immediately obtain
931            // a write lock (which can later be downgraded to a read lock). The problem is
932            // that this write lock is async: anything can happen between the drop and the
933            // new lock acquisition, and it's not possible to pause the runtime in the
934            // meantime.
935            //
936            // ## Semaphore with 1 permit, aka a Mutex
937            //
938            // The chosen idea is to allow only one execution at a time of this method: it
939            // becomes a critical section. That way we are free to “upgrade” the read lock
940            // by dropping it and obtaining a new write lock. All callers to this method are
941            // waiting, so nothing can happen in the meantime.
942            //
943            // Note that it doesn't conflict with the `write` method because this later
944            // immediately obtains a write lock, which avoids any conflict with this method.
945            //
946            // [^1]: https://docs.rs/lock_api/0.4.14/lock_api/struct.RwLock.html#method.upgradable_read
947            // [^2]: https://docs.rs/async-lock/3.4.1/async_lock/struct.RwLock.html#method.upgradable_read
948            let _one_reader_guard = self.read_lock_acquisition.lock().await;
949
950            // Obtain a read lock.
951            let state_guard = self.locked_state.read().await;
952
953            match state_guard.store.lock().await? {
954                EventCacheStoreLockState::Clean(store_guard) => {
955                    Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard })
956                }
957                EventCacheStoreLockState::Dirty(store_guard) => {
958                    // Drop the read lock, and take a write lock to modify the state.
959                    // This is safe because only one reader at a time (see
960                    // `Self::read_lock_acquisition`) is allowed.
961                    drop(state_guard);
962                    let state_guard = self.locked_state.write().await;
963
964                    let mut guard = RoomEventCacheStateLockWriteGuard {
965                        state: state_guard,
966                        store: store_guard,
967                    };
968
969                    // Force to reload by shrinking to the last chunk.
970                    let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
971
972                    // All good now, mark the cross-process lock as non-dirty.
973                    EventCacheStoreLockGuard::clear_dirty(&guard.store);
974
975                    // Downgrade the guard as soon as possible.
976                    let guard = guard.downgrade();
977
978                    // Now let the world know about the reload.
979                    if !updates_as_vector_diffs.is_empty() {
980                        // Notify observers about the update.
981                        let _ = guard.state.update_sender.send(
982                            RoomEventCacheUpdate::UpdateTimelineEvents {
983                                diffs: updates_as_vector_diffs,
984                                origin: EventsOrigin::Cache,
985                            },
986                        );
987
988                        // Notify observers about the generic update.
989                        let _ =
990                            guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
991                                room_id: guard.state.room_id.clone(),
992                            });
993                    }
994
995                    Ok(guard)
996                }
997            }
998        }
999
1000        /// Lock this [`RoomEventCacheStateLock`] with exclusive per-thread
1001        /// write access.
1002        ///
1003        /// This method locks the per-thread lock over the state, and then locks
1004        /// the cross-process lock over the store. It returns an RAII guard
1005        /// which will drop the write access to the state and to the store when
1006        /// dropped.
1007        ///
1008        /// If the cross-process lock over the store is dirty (see
1009        /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
1010        pub async fn write(
1011            &self,
1012        ) -> Result<RoomEventCacheStateLockWriteGuard<'_>, EventCacheError> {
1013            let state_guard = self.locked_state.write().await;
1014
1015            match state_guard.store.lock().await? {
1016                EventCacheStoreLockState::Clean(store_guard) => {
1017                    Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard })
1018                }
1019                EventCacheStoreLockState::Dirty(store_guard) => {
1020                    let mut guard = RoomEventCacheStateLockWriteGuard {
1021                        state: state_guard,
1022                        store: store_guard,
1023                    };
1024
1025                    // Force to reload by shrinking to the last chunk.
1026                    let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
1027
1028                    // All good now, mark the cross-process lock as non-dirty.
1029                    EventCacheStoreLockGuard::clear_dirty(&guard.store);
1030
1031                    // Now let the world know about the reload.
1032                    if !updates_as_vector_diffs.is_empty() {
1033                        // Notify observers about the update.
1034                        let _ = guard.state.update_sender.send(
1035                            RoomEventCacheUpdate::UpdateTimelineEvents {
1036                                diffs: updates_as_vector_diffs,
1037                                origin: EventsOrigin::Cache,
1038                            },
1039                        );
1040
1041                        // Notify observers about the generic update.
1042                        let _ =
1043                            guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1044                                room_id: guard.state.room_id.clone(),
1045                            });
1046                    }
1047
1048                    Ok(guard)
1049                }
1050            }
1051        }
1052    }
1053
1054    /// The read lock guard returned by [`RoomEventCacheStateLock::read`].
1055    pub struct RoomEventCacheStateLockReadGuard<'a> {
1056        /// The per-thread read lock guard over the
1057        /// [`RoomEventCacheStateLockInner`].
1058        state: RwLockReadGuard<'a, RoomEventCacheStateLockInner>,
1059
1060        /// The cross-process lock guard over the store.
1061        store: EventCacheStoreLockGuard,
1062    }
1063
1064    /// The write lock guard return by [`RoomEventCacheStateLock::write`].
1065    pub struct RoomEventCacheStateLockWriteGuard<'a> {
1066        /// The per-thread write lock guard over the
1067        /// [`RoomEventCacheStateLockInner`].
1068        state: RwLockWriteGuard<'a, RoomEventCacheStateLockInner>,
1069
1070        /// The cross-process lock guard over the store.
1071        store: EventCacheStoreLockGuard,
1072    }
1073
1074    impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1075        /// Synchronously downgrades a write lock into a read lock.
1076        ///
1077        /// The per-thread/state lock is downgraded atomically, without allowing
1078        /// any writers to take exclusive access of the lock in the meantime.
1079        ///
1080        /// It returns an RAII guard which will drop the write access to the
1081        /// state and to the store when dropped.
1082        fn downgrade(self) -> RoomEventCacheStateLockReadGuard<'a> {
1083            RoomEventCacheStateLockReadGuard { state: self.state.downgrade(), store: self.store }
1084        }
1085    }
1086
1087    impl<'a> RoomEventCacheStateLockReadGuard<'a> {
1088        /// Returns a read-only reference to the underlying room linked chunk.
1089        pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1090            &self.state.room_linked_chunk
1091        }
1092
1093        pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
1094            &self.state.subscriber_count
1095        }
1096
1097        /// Find a single event in this room.
1098        ///
1099        /// It starts by looking into loaded events in `EventLinkedChunk` before
1100        /// looking inside the storage.
1101        pub async fn find_event(
1102            &self,
1103            event_id: &EventId,
1104        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1105            find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1106                .await
1107        }
1108
1109        /// Find an event and all its relations in the persisted storage.
1110        ///
1111        /// This goes straight to the database, as a simplification; we don't
1112        /// expect to need to have to look up in memory events, or that
1113        /// all the related events are actually loaded.
1114        ///
1115        /// The related events are sorted like this:
1116        /// - events saved out-of-band with
1117        ///   [`super::RoomEventCache::save_events`] will be located at the
1118        ///   beginning of the array.
1119        /// - events present in the linked chunk (be it in memory or in the
1120        ///   database) will be sorted according to their ordering in the linked
1121        ///   chunk.
1122        pub async fn find_event_with_relations(
1123            &self,
1124            event_id: &EventId,
1125            filters: Option<Vec<RelationType>>,
1126        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1127            find_event_with_relations(
1128                event_id,
1129                &self.state.room_id,
1130                filters,
1131                &self.state.room_linked_chunk,
1132                &self.store,
1133            )
1134            .await
1135        }
1136
1137        /// Find all relations for an event in the persisted storage.
1138        ///
1139        /// This goes straight to the database, as a simplification; we don't
1140        /// expect to need to have to look up in memory events, or that
1141        /// all the related events are actually loaded.
1142        ///
1143        /// The related events are sorted like this:
1144        /// - events saved out-of-band with
1145        ///   [`super::RoomEventCache::save_events`] will be located at the
1146        ///   beginning of the array.
1147        /// - events present in the linked chunk (be it in memory or in the
1148        ///   database) will be sorted according to their ordering in the linked
1149        ///   chunk.
1150        pub async fn find_event_relations(
1151            &self,
1152            event_id: &EventId,
1153            filters: Option<Vec<RelationType>>,
1154        ) -> Result<Vec<Event>, EventCacheError> {
1155            find_event_relations(
1156                event_id,
1157                &self.state.room_id,
1158                filters,
1159                &self.state.room_linked_chunk,
1160                &self.store,
1161            )
1162            .await
1163        }
1164
1165        //// Find a single event in this room, starting from the most recent event.
1166        ///
1167        /// The `predicate` receives two arguments: the current event, and the
1168        /// ID of the _previous_ (older) event.
1169        ///
1170        /// **Warning**! It looks into the loaded events from the in-memory
1171        /// linked chunk **only**. It doesn't look inside the storage,
1172        /// contrary to [`Self::find_event`].
1173        pub fn rfind_map_event_in_memory_by<'i, O, P>(&'i self, mut predicate: P) -> Option<O>
1174        where
1175            P: FnMut(&'i Event, Option<&'i Event>) -> Option<O>,
1176        {
1177            self.state
1178                .room_linked_chunk
1179                .revents()
1180                .peekable()
1181                .batching(|iter| {
1182                    iter.next().map(|(_position, event)| {
1183                        (event, iter.peek().map(|(_next_position, next_event)| *next_event))
1184                    })
1185                })
1186                .find_map(|(event, next_event)| predicate(event, next_event))
1187        }
1188
1189        #[cfg(test)]
1190        pub fn is_dirty(&self) -> bool {
1191            EventCacheStoreLockGuard::is_dirty(&self.store)
1192        }
1193    }
1194
1195    impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1196        /// Returns a write reference to the underlying room linked chunk.
1197        #[cfg(any(feature = "e2e-encryption", test))]
1198        pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk {
1199            &mut self.state.room_linked_chunk
1200        }
1201
1202        /// Get a reference to the `waited_for_initial_prev_token` atomic bool.
1203        pub fn waited_for_initial_prev_token(&self) -> &Arc<AtomicBool> {
1204            &self.state.waited_for_initial_prev_token
1205        }
1206
1207        /// Find a single event in this room.
1208        ///
1209        /// It starts by looking into loaded events in `EventLinkedChunk` before
1210        /// looking inside the storage.
1211        pub async fn find_event(
1212            &self,
1213            event_id: &EventId,
1214        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1215            find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1216                .await
1217        }
1218
1219        /// Find an event and all its relations in the persisted storage.
1220        ///
1221        /// This goes straight to the database, as a simplification; we don't
1222        /// expect to need to have to look up in memory events, or that
1223        /// all the related events are actually loaded.
1224        ///
1225        /// The related events are sorted like this:
1226        /// - events saved out-of-band with
1227        ///   [`super::RoomEventCache::save_events`] will be located at the
1228        ///   beginning of the array.
1229        /// - events present in the linked chunk (be it in memory or in the
1230        ///   database) will be sorted according to their ordering in the linked
1231        ///   chunk.
1232        pub async fn find_event_with_relations(
1233            &self,
1234            event_id: &EventId,
1235            filters: Option<Vec<RelationType>>,
1236        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1237            find_event_with_relations(
1238                event_id,
1239                &self.state.room_id,
1240                filters,
1241                &self.state.room_linked_chunk,
1242                &self.store,
1243            )
1244            .await
1245        }
1246
1247        /// Load more events backwards if the last chunk is **not** a gap.
1248        pub async fn load_more_events_backwards(
1249            &mut self,
1250        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
1251            // If any in-memory chunk is a gap, don't load more events, and let the caller
1252            // resolve the gap.
1253            if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token)
1254            {
1255                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
1256            }
1257
1258            let prev_first_chunk = self
1259                .state
1260                .room_linked_chunk
1261                .chunks()
1262                .next()
1263                .expect("a linked chunk is never empty");
1264
1265            // The first chunk is not a gap, we can load its previous chunk.
1266            let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1267            let new_first_chunk = match self
1268                .store
1269                .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
1270                .await
1271            {
1272                Ok(Some(new_first_chunk)) => {
1273                    // All good, let's continue with this chunk.
1274                    new_first_chunk
1275                }
1276
1277                Ok(None) => {
1278                    // If we never received events for this room, this means we've never received a
1279                    // sync for that room, because every room must have *at least* a room creation
1280                    // event. Otherwise, we have reached the start of the timeline.
1281
1282                    if self.state.room_linked_chunk.events().next().is_some() {
1283                        // If there's at least one event, this means we've reached the start of the
1284                        // timeline, since the chunk is fully loaded.
1285                        trace!("chunk is fully loaded and non-empty: reached_start=true");
1286                        return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
1287                    }
1288
1289                    // Otherwise, start back-pagination from the end of the room.
1290                    return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
1291                }
1292
1293                Err(err) => {
1294                    error!("error when loading the previous chunk of a linked chunk: {err}");
1295
1296                    // Clear storage for this room.
1297                    self.store
1298                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1299                        .await?;
1300
1301                    // Return the error.
1302                    return Err(err.into());
1303                }
1304            };
1305
1306            let chunk_content = new_first_chunk.content.clone();
1307
1308            // We've reached the start on disk, if and only if, there was no chunk prior to
1309            // the one we just loaded.
1310            //
1311            // This value is correct, if and only if, it is used for a chunk content of kind
1312            // `Items`.
1313            let reached_start = new_first_chunk.previous.is_none();
1314
1315            if let Err(err) =
1316                self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk)
1317            {
1318                error!("error when inserting the previous chunk into its linked chunk: {err}");
1319
1320                // Clear storage for this room.
1321                self.store
1322                    .handle_linked_chunk_updates(
1323                        LinkedChunkId::Room(&self.state.room_id),
1324                        vec![Update::Clear],
1325                    )
1326                    .await?;
1327
1328                // Return the error.
1329                return Err(err.into());
1330            }
1331
1332            // ⚠️ Let's not propagate the updates to the store! We already have these data
1333            // in the store! Let's drain them.
1334            let _ = self.state.room_linked_chunk.store_updates().take();
1335
1336            // However, we want to get updates as `VectorDiff`s.
1337            let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1338
1339            Ok(match chunk_content {
1340                ChunkContent::Gap(gap) => {
1341                    trace!("reloaded chunk from disk (gap)");
1342                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1343                }
1344
1345                ChunkContent::Items(events) => {
1346                    trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1347                    LoadMoreEventsBackwardsOutcome::Events {
1348                        events,
1349                        timeline_event_diffs,
1350                        reached_start,
1351                    }
1352                }
1353            })
1354        }
1355
1356        /// If storage is enabled, unload all the chunks, then reloads only the
1357        /// last one.
1358        ///
1359        /// If storage's enabled, return a diff update that starts with a clear
1360        /// of all events; as a result, the caller may override any
1361        /// pending diff updates with the result of this function.
1362        ///
1363        /// Otherwise, returns `None`.
1364        pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1365            // Attempt to load the last chunk.
1366            let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1367            let (last_chunk, chunk_identifier_generator) =
1368                match self.store.load_last_chunk(linked_chunk_id).await {
1369                    Ok(pair) => pair,
1370
1371                    Err(err) => {
1372                        // If loading the last chunk failed, clear the entire linked chunk.
1373                        error!("error when reloading a linked chunk from memory: {err}");
1374
1375                        // Clear storage for this room.
1376                        self.store
1377                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1378                            .await?;
1379
1380                        // Restart with an empty linked chunk.
1381                        (None, ChunkIdentifierGenerator::new_from_scratch())
1382                    }
1383                };
1384
1385            debug!("unloading the linked chunk, and resetting it to its last chunk");
1386
1387            // Remove all the chunks from the linked chunks, except for the last one, and
1388            // updates the chunk identifier generator.
1389            if let Err(err) =
1390                self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1391            {
1392                error!("error when replacing the linked chunk: {err}");
1393                return self.reset_internal().await;
1394            }
1395
1396            // Let pagination observers know that we may have not reached the start of the
1397            // timeline.
1398            // TODO: likely need to cancel any ongoing pagination.
1399            self.state
1400                .pagination_status
1401                .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1402
1403            // Don't propagate those updates to the store; this is only for the in-memory
1404            // representation that we're doing this. Let's drain those store updates.
1405            let _ = self.state.room_linked_chunk.store_updates().take();
1406
1407            Ok(())
1408        }
1409
1410        /// Automatically shrink the room if there are no more subscribers, as
1411        /// indicated by the atomic number of active subscribers.
1412        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1413        pub async fn auto_shrink_if_no_subscribers(
1414            &mut self,
1415        ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1416            let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
1417
1418            trace!(subscriber_count, "received request to auto-shrink");
1419
1420            if subscriber_count == 0 {
1421                // If we are the last strong reference to the auto-shrinker, we can shrink the
1422                // events data structure to its last chunk.
1423                self.shrink_to_last_chunk().await?;
1424
1425                Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
1426            } else {
1427                Ok(None)
1428            }
1429        }
1430
1431        /// Force to shrink the room, whenever there is subscribers or not.
1432        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1433        pub async fn force_shrink_to_last_chunk(
1434            &mut self,
1435        ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1436            self.shrink_to_last_chunk().await?;
1437
1438            Ok(self.state.room_linked_chunk.updates_as_vector_diffs())
1439        }
1440
1441        /// Remove events by their position, in `EventLinkedChunk` and in
1442        /// `EventCacheStore`.
1443        ///
1444        /// This method is purposely isolated because it must ensure that
1445        /// positions are sorted appropriately or it can be disastrous.
1446        #[instrument(skip_all)]
1447        pub async fn remove_events(
1448            &mut self,
1449            in_memory_events: Vec<(OwnedEventId, Position)>,
1450            in_store_events: Vec<(OwnedEventId, Position)>,
1451        ) -> Result<(), EventCacheError> {
1452            // In-store events.
1453            if !in_store_events.is_empty() {
1454                let mut positions = in_store_events
1455                    .into_iter()
1456                    .map(|(_event_id, position)| position)
1457                    .collect::<Vec<_>>();
1458
1459                sort_positions_descending(&mut positions);
1460
1461                let updates = positions
1462                    .into_iter()
1463                    .map(|pos| Update::RemoveItem { at: pos })
1464                    .collect::<Vec<_>>();
1465
1466                self.apply_store_only_updates(updates).await?;
1467            }
1468
1469            // In-memory events.
1470            if in_memory_events.is_empty() {
1471                // Nothing else to do, return early.
1472                return Ok(());
1473            }
1474
1475            // `remove_events_by_position` is responsible of sorting positions.
1476            self.state
1477                .room_linked_chunk
1478                .remove_events_by_position(
1479                    in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1480                )
1481                .expect("failed to remove an event");
1482
1483            self.propagate_changes().await
1484        }
1485
1486        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1487            let updates = self.state.room_linked_chunk.store_updates().take();
1488            self.send_updates_to_store(updates).await
1489        }
1490
1491        /// Apply some updates that are effective only on the store itself.
1492        ///
1493        /// This method should be used only for updates that happen *outside*
1494        /// the in-memory linked chunk. Such updates must be applied
1495        /// onto the ordering tracker as well as to the persistent
1496        /// storage.
1497        async fn apply_store_only_updates(
1498            &mut self,
1499            updates: Vec<Update<Event, Gap>>,
1500        ) -> Result<(), EventCacheError> {
1501            self.state.room_linked_chunk.order_tracker.map_updates(&updates);
1502            self.send_updates_to_store(updates).await
1503        }
1504
1505        async fn send_updates_to_store(
1506            &mut self,
1507            mut updates: Vec<Update<Event, Gap>>,
1508        ) -> Result<(), EventCacheError> {
1509            if updates.is_empty() {
1510                return Ok(());
1511            }
1512
1513            // Strip relations from updates which insert or replace items.
1514            for update in updates.iter_mut() {
1515                match update {
1516                    Update::PushItems { items, .. } => strip_relations_from_events(items),
1517                    Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
1518                    // Other update kinds don't involve adding new events.
1519                    Update::NewItemsChunk { .. }
1520                    | Update::NewGapChunk { .. }
1521                    | Update::RemoveChunk(_)
1522                    | Update::RemoveItem { .. }
1523                    | Update::DetachLastItems { .. }
1524                    | Update::StartReattachItems
1525                    | Update::EndReattachItems
1526                    | Update::Clear => {}
1527                }
1528            }
1529
1530            // Spawn a task to make sure that all the changes are effectively forwarded to
1531            // the store, even if the call to this method gets aborted.
1532            //
1533            // The store cross-process locking involves an actual mutex, which ensures that
1534            // storing updates happens in the expected order.
1535
1536            let store = self.store.clone();
1537            let room_id = self.state.room_id.clone();
1538            let cloned_updates = updates.clone();
1539
1540            spawn(async move {
1541                trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1542                let linked_chunk_id = LinkedChunkId::Room(&room_id);
1543                store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1544                trace!("linked chunk updates applied");
1545
1546                super::Result::Ok(())
1547            })
1548            .await
1549            .expect("joining failed")?;
1550
1551            // Forward that the store got updated to observers.
1552            let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1553                linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()),
1554                updates,
1555            });
1556
1557            Ok(())
1558        }
1559
1560        /// Reset this data structure as if it were brand new.
1561        ///
1562        /// Return a single diff update that is a clear of all events; as a
1563        /// result, the caller may override any pending diff updates
1564        /// with the result of this function.
1565        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1566            self.reset_internal().await?;
1567
1568            let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
1569
1570            // Ensure the contract defined in the doc comment is true:
1571            debug_assert_eq!(diff_updates.len(), 1);
1572            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1573
1574            Ok(diff_updates)
1575        }
1576
1577        async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1578            self.state.room_linked_chunk.reset();
1579
1580            // No need to update the thread summaries: the room events are
1581            // gone because of the reset of `room_linked_chunk`.
1582            //
1583            // Clear the threads.
1584            for thread in self.state.threads.values_mut() {
1585                thread.clear();
1586            }
1587
1588            self.propagate_changes().await?;
1589
1590            // Reset the pagination state too: pretend we never waited for the initial
1591            // prev-batch token, and indicate that we're not at the start of the
1592            // timeline, since we don't know about that anymore.
1593            self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst);
1594            // TODO: likely must cancel any ongoing back-paginations too
1595            self.state
1596                .pagination_status
1597                .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1598
1599            Ok(())
1600        }
1601
1602        /// Handle the result of a sync.
1603        ///
1604        /// It may send room event cache updates to the given sender, if it
1605        /// generated any of those.
1606        ///
1607        /// Returns `true` for the first part of the tuple if a new gap
1608        /// (previous-batch token) has been inserted, `false` otherwise.
1609        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1610        pub async fn handle_sync(
1611            &mut self,
1612            mut timeline: Timeline,
1613        ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1614            let mut prev_batch = timeline.prev_batch.take();
1615
1616            let DeduplicationOutcome {
1617                all_events: events,
1618                in_memory_duplicated_event_ids,
1619                in_store_duplicated_event_ids,
1620                non_empty_all_duplicates: all_duplicates,
1621            } = filter_duplicate_events(
1622                &self.store,
1623                LinkedChunkId::Room(&self.state.room_id),
1624                &self.state.room_linked_chunk,
1625                timeline.events,
1626            )
1627            .await?;
1628
1629            // If the timeline isn't limited, and we already knew about some past events,
1630            // then this definitely knows what the timeline head is (either we know
1631            // about all the events persisted in storage, or we have a gap
1632            // somewhere). In this case, we can ditch the previous-batch
1633            // token, which is an optimization to avoid unnecessary future back-pagination
1634            // requests.
1635            //
1636            // We can also ditch it if we knew about all the events that came from sync,
1637            // namely, they were all deduplicated. In this case, using the
1638            // previous-batch token would only result in fetching other events we
1639            // knew about. This is slightly incorrect in the presence of
1640            // network splits, but this has shown to be Good Enough™.
1641            if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
1642                || all_duplicates
1643            {
1644                prev_batch = None;
1645            }
1646
1647            if prev_batch.is_some() {
1648                // Sad time: there's a gap, somewhere, in the timeline, and there's at least one
1649                // non-duplicated event. We don't know which threads might have gappy, so we
1650                // must invalidate them all :(
1651                // TODO(bnjbvr): figure out a better catchup mechanism for threads.
1652                let mut summaries_to_update = Vec::new();
1653
1654                for (thread_root, thread) in self.state.threads.iter_mut() {
1655                    // Empty the thread's linked chunk.
1656                    thread.clear();
1657
1658                    summaries_to_update.push(thread_root.clone());
1659                }
1660
1661                // Now, update the summaries to indicate that we're not sure what the latest
1662                // thread event is. The thread count can remain as is, as it might still be
1663                // valid, and there's no good value to reset it to, anyways.
1664                for thread_root in summaries_to_update {
1665                    let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1666                    else {
1667                        trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1668                        continue;
1669                    };
1670
1671                    if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1672                        prev_summary.latest_reply = None;
1673
1674                        target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1675
1676                        self.replace_event_at(location, target_event).await?;
1677                    }
1678                }
1679            }
1680
1681            if all_duplicates {
1682                // No new events and no gap (per the previous check), thus no need to change the
1683                // room state. We're done!
1684                return Ok((false, Vec::new()));
1685            }
1686
1687            let has_new_gap = prev_batch.is_some();
1688
1689            // If we've never waited for an initial previous-batch token, and we've now
1690            // inserted a gap, no need to wait for a previous-batch token later.
1691            if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap {
1692                self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst);
1693            }
1694
1695            // Remove the old duplicated events.
1696            //
1697            // We don't have to worry the removals can change the position of the existing
1698            // events, because we are pushing all _new_ `events` at the back.
1699            self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1700                .await?;
1701
1702            self.state
1703                .room_linked_chunk
1704                .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1705
1706            self.post_process_new_events(events, true).await?;
1707
1708            if timeline.limited && has_new_gap {
1709                // If there was a previous batch token for a limited timeline, unload the chunks
1710                // so it only contains the last one; otherwise, there might be a
1711                // valid gap in between, and observers may not render it (yet).
1712                //
1713                // We must do this *after* persisting these events to storage (in
1714                // `post_process_new_events`).
1715                self.shrink_to_last_chunk().await?;
1716            }
1717
1718            let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1719
1720            Ok((has_new_gap, timeline_event_diffs))
1721        }
1722
1723        /// Handle the result of a single back-pagination request.
1724        ///
1725        /// If the `prev_token` is set, then this function will check that the
1726        /// corresponding gap is present in the in-memory linked chunk.
1727        /// If it's not the case, `Ok(None)` will be returned, and the
1728        /// caller may decide to do something based on that (e.g. restart a
1729        /// pagination).
1730        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1731        pub async fn handle_backpagination(
1732            &mut self,
1733            events: Vec<Event>,
1734            mut new_token: Option<String>,
1735            prev_token: Option<String>,
1736        ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1737        {
1738            // Check that the previous token still exists; otherwise it's a sign that the
1739            // room's timeline has been cleared.
1740            let prev_gap_id = if let Some(token) = prev_token {
1741                // Find the corresponding gap in the in-memory linked chunk.
1742                let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| {
1743                    matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1744                });
1745
1746                if gap_chunk_id.is_none() {
1747                    // We got a previous-batch token from the linked chunk *before* running the
1748                    // request, but it is missing *after* completing the request.
1749                    //
1750                    // It may be a sign the linked chunk has been reset, but it's fine, per this
1751                    // function's contract.
1752                    return Ok(None);
1753                }
1754
1755                gap_chunk_id
1756            } else {
1757                None
1758            };
1759
1760            let DeduplicationOutcome {
1761                all_events: mut events,
1762                in_memory_duplicated_event_ids,
1763                in_store_duplicated_event_ids,
1764                non_empty_all_duplicates: all_duplicates,
1765            } = filter_duplicate_events(
1766                &self.store,
1767                LinkedChunkId::Room(&self.state.room_id),
1768                &self.state.room_linked_chunk,
1769                events,
1770            )
1771            .await?;
1772
1773            // If not all the events have been back-paginated, we need to remove the
1774            // previous ones, otherwise we can end up with misordered events.
1775            //
1776            // Consider the following scenario:
1777            // - sync returns [D, E, F]
1778            // - then sync returns [] with a previous batch token PB1, so the internal
1779            //   linked chunk state is [D, E, F, PB1].
1780            // - back-paginating with PB1 may return [A, B, C, D, E, F].
1781            //
1782            // Only inserting the new events when replacing PB1 would result in a timeline
1783            // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
1784            // all the events, in case this happens (see also #4746).
1785
1786            if !all_duplicates {
1787                // Let's forget all the previous events.
1788                self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1789                    .await?;
1790            } else {
1791                // All new events are duplicated, they can all be ignored.
1792                events.clear();
1793                // The gap can be ditched too, as it won't be useful to backpaginate any
1794                // further.
1795                new_token = None;
1796            }
1797
1798            // `/messages` has been called with `dir=b` (backwards), so the events are in
1799            // the inverted order; reorder them.
1800            let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1801
1802            let new_gap = new_token.map(|prev_token| Gap { prev_token });
1803            let reached_start = self.state.room_linked_chunk.finish_back_pagination(
1804                prev_gap_id,
1805                new_gap,
1806                &topo_ordered_events,
1807            );
1808
1809            // Note: this flushes updates to the store.
1810            self.post_process_new_events(topo_ordered_events, false).await?;
1811
1812            let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1813
1814            Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1815        }
1816
1817        /// Subscribe to thread for a given root event, and get a (maybe empty)
1818        /// initially known list of events for that thread.
1819        pub fn subscribe_to_thread(
1820            &mut self,
1821            root: OwnedEventId,
1822        ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1823            self.get_or_reload_thread(root).subscribe()
1824        }
1825
1826        /// Back paginate in the given thread.
1827        ///
1828        /// Will always start from the end, unless we previously paginated.
1829        pub fn finish_thread_network_pagination(
1830            &mut self,
1831            root: OwnedEventId,
1832            prev_token: Option<String>,
1833            new_token: Option<String>,
1834            events: Vec<Event>,
1835        ) -> Option<BackPaginationOutcome> {
1836            self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1837        }
1838
1839        pub fn load_more_thread_events_backwards(
1840            &mut self,
1841            root: OwnedEventId,
1842        ) -> LoadMoreEventsBackwardsOutcome {
1843            self.get_or_reload_thread(root).load_more_events_backwards()
1844        }
1845
1846        // --------------------------------------------
1847        // utility methods
1848        // --------------------------------------------
1849
1850        /// Post-process new events, after they have been added to the in-memory
1851        /// linked chunk.
1852        ///
1853        /// Flushes updates to disk first.
1854        pub(in super::super) async fn post_process_new_events(
1855            &mut self,
1856            events: Vec<Event>,
1857            is_sync: bool,
1858        ) -> Result<(), EventCacheError> {
1859            // Update the store before doing the post-processing.
1860            self.propagate_changes().await?;
1861
1862            let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1863
1864            for event in events {
1865                self.maybe_apply_new_redaction(&event).await?;
1866
1867                if self.state.enabled_thread_support {
1868                    // Only add the event to a thread if:
1869                    // - thread support is enabled,
1870                    // - and if this is a sync (we can't know where to insert backpaginated events
1871                    //   in threads).
1872                    if is_sync {
1873                        if let Some(thread_root) = extract_thread_root(event.raw()) {
1874                            new_events_by_thread
1875                                .entry(thread_root)
1876                                .or_default()
1877                                .push(event.clone());
1878                        } else if let Some(event_id) = event.event_id() {
1879                            // If we spot the root of a thread, add it to its linked chunk.
1880                            if self.state.threads.contains_key(&event_id) {
1881                                new_events_by_thread
1882                                    .entry(event_id)
1883                                    .or_default()
1884                                    .push(event.clone());
1885                            }
1886                        }
1887                    }
1888
1889                    // Look for edits that may apply to a thread; we'll process them later.
1890                    if let Some(edit_target) = extract_edit_target(event.raw()) {
1891                        // If the edited event is known, and part of a thread,
1892                        if let Some((_location, edit_target_event)) =
1893                            self.find_event(&edit_target).await?
1894                            && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1895                        {
1896                            // Mark the thread for processing, unless it was already marked as
1897                            // such.
1898                            new_events_by_thread.entry(thread_root).or_default();
1899                        }
1900                    }
1901                }
1902
1903                // Save a bundled thread event, if there was one.
1904                if let Some(bundled_thread) = event.bundled_latest_thread_event {
1905                    self.save_events([*bundled_thread]).await?;
1906                }
1907            }
1908
1909            if self.state.enabled_thread_support {
1910                self.update_threads(new_events_by_thread).await?;
1911            }
1912
1913            Ok(())
1914        }
1915
1916        fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1917            // TODO: when there's persistent storage, try to lazily reload from disk, if
1918            // missing from memory.
1919            let room_id = self.state.room_id.clone();
1920            let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1921
1922            self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1923                ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender)
1924            })
1925        }
1926
1927        #[instrument(skip_all)]
1928        async fn update_threads(
1929            &mut self,
1930            new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1931        ) -> Result<(), EventCacheError> {
1932            for (thread_root, new_events) in new_events_by_thread {
1933                let thread_cache = self.get_or_reload_thread(thread_root.clone());
1934
1935                thread_cache.add_live_events(new_events);
1936
1937                let mut latest_event_id = thread_cache.latest_event_id();
1938
1939                // If there's an edit to the latest event in the thread, use the latest edit
1940                // event id as the latest event id for the thread summary.
1941                if let Some(event_id) = latest_event_id.as_ref()
1942                    && let Some((_, edits)) = self
1943                        .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1944                        .await?
1945                    && let Some(latest_edit) = edits.last()
1946                {
1947                    latest_event_id = latest_edit.event_id();
1948                }
1949
1950                self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1951            }
1952
1953            Ok(())
1954        }
1955
1956        /// Update a thread summary on the given thread root, if needs be.
1957        async fn maybe_update_thread_summary(
1958            &mut self,
1959            thread_root: OwnedEventId,
1960            latest_event_id: Option<OwnedEventId>,
1961        ) -> Result<(), EventCacheError> {
1962            // Add a thread summary to the (room) event which has the thread root, if we
1963            // knew about it.
1964
1965            let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1966                trace!(%thread_root, "thread root event is missing from the room linked chunk");
1967                return Ok(());
1968            };
1969
1970            let prev_summary = target_event.thread_summary.summary();
1971
1972            // Recompute the thread summary, if needs be.
1973
1974            // Read the latest number of thread replies from the store.
1975            //
1976            // Implementation note: since this is based on the `m.relates_to` field, and
1977            // that field can only be present on room messages, we don't have to
1978            // worry about filtering out aggregation events (like
1979            // reactions/edits/etc.). Pretty neat, huh?
1980            let num_replies = {
1981                let thread_replies = self
1982                    .store
1983                    .find_event_relations(
1984                        &self.state.room_id,
1985                        &thread_root,
1986                        Some(&[RelationType::Thread]),
1987                    )
1988                    .await?;
1989                thread_replies.len().try_into().unwrap_or(u32::MAX)
1990            };
1991
1992            let new_summary = if num_replies > 0 {
1993                Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1994            } else {
1995                None
1996            };
1997
1998            if prev_summary == new_summary.as_ref() {
1999                trace!(%thread_root, "thread summary is already up-to-date");
2000                return Ok(());
2001            }
2002
2003            // Trigger an update to observers.
2004            trace!(%thread_root, "updating thread summary: {new_summary:?}");
2005            target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
2006            self.replace_event_at(location, target_event).await
2007        }
2008
2009        /// Replaces a single event, be it saved in memory or in the store.
2010        ///
2011        /// If it was saved in memory, this will emit a notification to
2012        /// observers that a single item has been replaced. Otherwise,
2013        /// such a notification is not emitted, because observers are
2014        /// unlikely to observe the store updates directly.
2015        pub(crate) async fn replace_event_at(
2016            &mut self,
2017            location: EventLocation,
2018            event: Event,
2019        ) -> Result<(), EventCacheError> {
2020            match location {
2021                EventLocation::Memory(position) => {
2022                    self.state
2023                        .room_linked_chunk
2024                        .replace_event_at(position, event)
2025                        .expect("should have been a valid position of an item");
2026                    // We just changed the in-memory representation; synchronize this with
2027                    // the store.
2028                    self.propagate_changes().await?;
2029                }
2030                EventLocation::Store => {
2031                    self.save_events([event]).await?;
2032                }
2033            }
2034
2035            Ok(())
2036        }
2037
2038        /// If the given event is a redaction, try to retrieve the
2039        /// to-be-redacted event in the chunk, and replace it by the
2040        /// redacted form.
2041        #[instrument(skip_all)]
2042        async fn maybe_apply_new_redaction(
2043            &mut self,
2044            event: &Event,
2045        ) -> Result<(), EventCacheError> {
2046            let raw_event = event.raw();
2047
2048            // Do not deserialise the entire event if we aren't certain it's a
2049            // `m.room.redaction`. It saves a non-negligible amount of computations.
2050            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
2051                raw_event.get_field::<MessageLikeEventType>("type")
2052            else {
2053                return Ok(());
2054            };
2055
2056            // It is a `m.room.redaction`! We can deserialize it entirely.
2057
2058            let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
2059                redaction,
2060            ))) = raw_event.deserialize()
2061            else {
2062                return Ok(());
2063            };
2064
2065            let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
2066                warn!("missing target event id from the redaction event");
2067                return Ok(());
2068            };
2069
2070            // Replace the redacted event by a redacted form, if we knew about it.
2071            let Some((location, mut target_event)) = self.find_event(event_id).await? else {
2072                trace!("redacted event is missing from the linked chunk");
2073                return Ok(());
2074            };
2075
2076            // Don't redact already redacted events.
2077            let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
2078                // TODO: replace with `deserialized.is_redacted()` when
2079                // https://github.com/ruma/ruma/pull/2254 has been merged.
2080                match deserialized {
2081                    AnySyncTimelineEvent::MessageLike(ev) => {
2082                        if ev.is_redacted() {
2083                            return Ok(());
2084                        }
2085                    }
2086                    AnySyncTimelineEvent::State(ev) => {
2087                        if ev.is_redacted() {
2088                            return Ok(());
2089                        }
2090                    }
2091                }
2092
2093                // If the event is part of a thread, update the thread linked chunk and the
2094                // summary.
2095                extract_thread_root(target_event.raw())
2096            } else {
2097                warn!("failed to deserialize the event to redact");
2098                None
2099            };
2100
2101            if let Some(redacted_event) = apply_redaction(
2102                target_event.raw(),
2103                event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
2104                &self.state.room_version_rules.redaction,
2105            ) {
2106                // It's safe to cast `redacted_event` here:
2107                // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
2108                //   when calling .raw(), so it's still one under the hood.
2109                // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
2110                target_event.replace_raw(redacted_event.cast_unchecked());
2111
2112                self.replace_event_at(location, target_event).await?;
2113
2114                // If the redacted event was part of a thread, remove it in the thread linked
2115                // chunk too, and make sure to update the thread root's summary
2116                // as well.
2117                //
2118                // Note: there is an ordering issue here: the above `replace_event_at` must
2119                // happen BEFORE we recompute the summary, otherwise the set of
2120                // replies may include the to-be-redacted event.
2121                if let Some(thread_root) = thread_root
2122                    && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
2123                {
2124                    thread_cache.remove_if_present(event_id);
2125
2126                    // The number of replies may have changed, so update the thread summary if
2127                    // needs be.
2128                    let latest_event_id = thread_cache.latest_event_id();
2129                    self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2130                }
2131            }
2132
2133            Ok(())
2134        }
2135
2136        /// Save events into the database, without notifying observers.
2137        pub async fn save_events(
2138            &mut self,
2139            events: impl IntoIterator<Item = Event>,
2140        ) -> Result<(), EventCacheError> {
2141            let store = self.store.clone();
2142            let room_id = self.state.room_id.clone();
2143            let events = events.into_iter().collect::<Vec<_>>();
2144
2145            // Spawn a task so the save is uninterrupted by task cancellation.
2146            spawn(async move {
2147                for event in events {
2148                    store.save_event(&room_id, event).await?;
2149                }
2150                super::Result::Ok(())
2151            })
2152            .await
2153            .expect("joining failed")?;
2154
2155            Ok(())
2156        }
2157
2158        #[cfg(test)]
2159        pub fn is_dirty(&self) -> bool {
2160            EventCacheStoreLockGuard::is_dirty(&self.store)
2161        }
2162    }
2163
2164    /// Load a linked chunk's full metadata, making sure the chunks are
2165    /// according to their their links.
2166    ///
2167    /// Returns `None` if there's no such linked chunk in the store, or an
2168    /// error if the linked chunk is malformed.
2169    async fn load_linked_chunk_metadata(
2170        store_guard: &EventCacheStoreLockGuard,
2171        linked_chunk_id: LinkedChunkId<'_>,
2172    ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
2173        let mut all_chunks = store_guard
2174            .load_all_chunks_metadata(linked_chunk_id)
2175            .await
2176            .map_err(EventCacheError::from)?;
2177
2178        if all_chunks.is_empty() {
2179            // There are no chunks, so there's nothing to do.
2180            return Ok(None);
2181        }
2182
2183        // Transform the vector into a hashmap, for quick lookup of the predecessors.
2184        let chunk_map: HashMap<_, _> =
2185            all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
2186
2187        // Find a last chunk.
2188        let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
2189        let Some(last) = iter.next() else {
2190            return Err(EventCacheError::InvalidLinkedChunkMetadata {
2191                details: "no last chunk found".to_owned(),
2192            });
2193        };
2194
2195        // There must at most one last chunk.
2196        if let Some(other_last) = iter.next() {
2197            return Err(EventCacheError::InvalidLinkedChunkMetadata {
2198                details: format!(
2199                    "chunks {} and {} both claim to be last chunks",
2200                    last.identifier.index(),
2201                    other_last.identifier.index()
2202                ),
2203            });
2204        }
2205
2206        // Rewind the chain back to the first chunk, and do some checks at the same
2207        // time.
2208        let mut seen = HashSet::new();
2209        let mut current = last;
2210        loop {
2211            // If we've already seen this chunk, there's a cycle somewhere.
2212            if !seen.insert(current.identifier) {
2213                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2214                    details: format!(
2215                        "cycle detected in linked chunk at {}",
2216                        current.identifier.index()
2217                    ),
2218                });
2219            }
2220
2221            let Some(prev_id) = current.previous else {
2222                // If there's no previous chunk, we're done.
2223                if seen.len() != all_chunks.len() {
2224                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
2225                        details: format!(
2226                            "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
2227                            seen.len(),
2228                            all_chunks.len()
2229                        ),
2230                    });
2231                }
2232                break;
2233            };
2234
2235            // If the previous chunk is not in the map, then it's unknown
2236            // and missing.
2237            let Some(pred_meta) = chunk_map.get(&prev_id) else {
2238                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2239                    details: format!(
2240                        "missing predecessor {} chunk for {}",
2241                        prev_id.index(),
2242                        current.identifier.index()
2243                    ),
2244                });
2245            };
2246
2247            // If the previous chunk isn't connected to the next, then the link is invalid.
2248            if pred_meta.next != Some(current.identifier) {
2249                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2250                    details: format!(
2251                        "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
2252                        pred_meta.identifier.index(),
2253                        pred_meta.next.map(|chunk_id| chunk_id.index()),
2254                        current.identifier.index()
2255                    ),
2256                });
2257            }
2258
2259            current = *pred_meta;
2260        }
2261
2262        // At this point, `current` is the identifier of the first chunk.
2263        //
2264        // Reorder the resulting vector, by going through the chain of `next` links, and
2265        // swapping items into their final position.
2266        //
2267        // Invariant in this loop: all items in [0..i[ are in their final, correct
2268        // position.
2269        let mut current = current.identifier;
2270        for i in 0..all_chunks.len() {
2271            // Find the target metadata.
2272            let j = all_chunks
2273                .iter()
2274                .rev()
2275                .position(|meta| meta.identifier == current)
2276                .map(|j| all_chunks.len() - 1 - j)
2277                .expect("the target chunk must be present in the metadata");
2278            if i != j {
2279                all_chunks.swap(i, j);
2280            }
2281            if let Some(next) = all_chunks[i].next {
2282                current = next;
2283            }
2284        }
2285
2286        Ok(Some(all_chunks))
2287    }
2288
2289    /// Removes the bundled relations from an event, if they were present.
2290    ///
2291    /// Only replaces the present if it contained bundled relations.
2292    fn strip_relations_if_present<T>(event: &mut Raw<T>) {
2293        // We're going to get rid of the `unsigned`/`m.relations` field, if it's
2294        // present.
2295        // Use a closure that returns an option so we can quickly short-circuit.
2296        let mut closure = || -> Option<()> {
2297            let mut val: serde_json::Value = event.deserialize_as().ok()?;
2298            let unsigned = val.get_mut("unsigned")?;
2299            let unsigned_obj = unsigned.as_object_mut()?;
2300            if unsigned_obj.remove("m.relations").is_some() {
2301                *event = Raw::new(&val).ok()?.cast_unchecked();
2302            }
2303            None
2304        };
2305        let _ = closure();
2306    }
2307
2308    fn strip_relations_from_event(ev: &mut Event) {
2309        match &mut ev.kind {
2310            TimelineEventKind::Decrypted(decrypted) => {
2311                // Remove all information about encryption info for
2312                // the bundled events.
2313                decrypted.unsigned_encryption_info = None;
2314
2315                // Remove the `unsigned`/`m.relations` field, if needs be.
2316                strip_relations_if_present(&mut decrypted.event);
2317            }
2318
2319            TimelineEventKind::UnableToDecrypt { event, .. }
2320            | TimelineEventKind::PlainText { event } => {
2321                strip_relations_if_present(event);
2322            }
2323        }
2324    }
2325
2326    /// Strips the bundled relations from a collection of events.
2327    fn strip_relations_from_events(items: &mut [Event]) {
2328        for ev in items.iter_mut() {
2329            strip_relations_from_event(ev);
2330        }
2331    }
2332
2333    /// Implementation of [`RoomEventCacheStateLockReadGuard::find_event`] and
2334    /// [`RoomEventCacheStateLockWriteGuard::find_event`].
2335    async fn find_event(
2336        event_id: &EventId,
2337        room_id: &RoomId,
2338        room_linked_chunk: &EventLinkedChunk,
2339        store: &EventCacheStoreLockGuard,
2340    ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
2341        // There are supposedly fewer events loaded in memory than in the store. Let's
2342        // start by looking up in the `EventLinkedChunk`.
2343        for (position, event) in room_linked_chunk.revents() {
2344            if event.event_id().as_deref() == Some(event_id) {
2345                return Ok(Some((EventLocation::Memory(position), event.clone())));
2346            }
2347        }
2348
2349        Ok(store.find_event(room_id, event_id).await?.map(|event| (EventLocation::Store, event)))
2350    }
2351
2352    /// Implementation of
2353    /// [`RoomEventCacheStateLockReadGuard::find_event_with_relations`] and
2354    /// [`RoomEventCacheStateLockWriteGuard::find_event_with_relations`].
2355    async fn find_event_with_relations(
2356        event_id: &EventId,
2357        room_id: &RoomId,
2358        filters: Option<Vec<RelationType>>,
2359        room_linked_chunk: &EventLinkedChunk,
2360        store: &EventCacheStoreLockGuard,
2361    ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
2362        // First, hit storage to get the target event and its related events.
2363        let found = store.find_event(room_id, event_id).await?;
2364
2365        let Some(target) = found else {
2366            // We haven't found the event: return early.
2367            return Ok(None);
2368        };
2369
2370        // Then, find the transitive closure of all the related events.
2371        let related =
2372            find_event_relations(event_id, room_id, filters, room_linked_chunk, store).await?;
2373
2374        Ok(Some((target, related)))
2375    }
2376
2377    /// Implementation of
2378    /// [`RoomEventCacheStateLockReadGuard::find_event_relations`].
2379    async fn find_event_relations(
2380        event_id: &EventId,
2381        room_id: &RoomId,
2382        filters: Option<Vec<RelationType>>,
2383        room_linked_chunk: &EventLinkedChunk,
2384        store: &EventCacheStoreLockGuard,
2385    ) -> Result<Vec<Event>, EventCacheError> {
2386        // Initialize the stack with all the related events, to find the
2387        // transitive closure of all the related events.
2388        let mut related = store.find_event_relations(room_id, event_id, filters.as_deref()).await?;
2389        let mut stack =
2390            related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
2391
2392        // Also keep track of already seen events, in case there's a loop in the
2393        // relation graph.
2394        let mut already_seen = HashSet::new();
2395        already_seen.insert(event_id.to_owned());
2396
2397        let mut num_iters = 1;
2398
2399        // Find the related event for each previously-related event.
2400        while let Some(event_id) = stack.pop() {
2401            if !already_seen.insert(event_id.clone()) {
2402                // Skip events we've already seen.
2403                continue;
2404            }
2405
2406            let other_related =
2407                store.find_event_relations(room_id, &event_id, filters.as_deref()).await?;
2408
2409            stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
2410            related.extend(other_related);
2411
2412            num_iters += 1;
2413        }
2414
2415        trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
2416
2417        // Sort the results by their positions in the linked chunk, if available.
2418        //
2419        // If an event doesn't have a known position, it goes to the start of the array.
2420        related.sort_by(|(_, lhs), (_, rhs)| {
2421            use std::cmp::Ordering;
2422
2423            match (lhs, rhs) {
2424                (None, None) => Ordering::Equal,
2425                (None, Some(_)) => Ordering::Less,
2426                (Some(_), None) => Ordering::Greater,
2427                (Some(lhs), Some(rhs)) => {
2428                    let lhs = room_linked_chunk.event_order(*lhs);
2429                    let rhs = room_linked_chunk.event_order(*rhs);
2430
2431                    // The events should have a definite position, but in the case they don't,
2432                    // still consider that not having a position means you'll end at the start
2433                    // of the array.
2434                    match (lhs, rhs) {
2435                        (None, None) => Ordering::Equal,
2436                        (None, Some(_)) => Ordering::Less,
2437                        (Some(_), None) => Ordering::Greater,
2438                        (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
2439                    }
2440                }
2441            }
2442        });
2443
2444        // Keep only the events, not their positions.
2445        let related = related.into_iter().map(|(event, _pos)| event).collect();
2446
2447        Ok(related)
2448    }
2449}
2450
2451/// An enum representing where an event has been found.
2452pub(super) enum EventLocation {
2453    /// Event lives in memory (and likely in the store!).
2454    Memory(Position),
2455
2456    /// Event lives in the store only, it has not been loaded in memory yet.
2457    Store,
2458}
2459
2460pub(super) use private::RoomEventCacheStateLock;
2461
2462#[cfg(test)]
2463mod tests {
2464    use matrix_sdk_base::event_cache::Event;
2465    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2466    use ruma::{
2467        RoomId, event_id,
2468        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2469        room_id, user_id,
2470    };
2471
2472    use crate::test_utils::logged_in_client;
2473
2474    #[async_test]
2475    async fn test_find_event_by_id_with_edit_relation() {
2476        let original_id = event_id!("$original");
2477        let related_id = event_id!("$related");
2478        let room_id = room_id!("!galette:saucisse.bzh");
2479        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2480
2481        assert_relations(
2482            room_id,
2483            f.text_msg("Original event").event_id(original_id).into(),
2484            f.text_msg("* An edited event")
2485                .edit(
2486                    original_id,
2487                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2488                )
2489                .event_id(related_id)
2490                .into(),
2491            f,
2492        )
2493        .await;
2494    }
2495
2496    #[async_test]
2497    async fn test_find_event_by_id_with_thread_reply_relation() {
2498        let original_id = event_id!("$original");
2499        let related_id = event_id!("$related");
2500        let room_id = room_id!("!galette:saucisse.bzh");
2501        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2502
2503        assert_relations(
2504            room_id,
2505            f.text_msg("Original event").event_id(original_id).into(),
2506            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2507            f,
2508        )
2509        .await;
2510    }
2511
2512    #[async_test]
2513    async fn test_find_event_by_id_with_reaction_relation() {
2514        let original_id = event_id!("$original");
2515        let related_id = event_id!("$related");
2516        let room_id = room_id!("!galette:saucisse.bzh");
2517        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2518
2519        assert_relations(
2520            room_id,
2521            f.text_msg("Original event").event_id(original_id).into(),
2522            f.reaction(original_id, ":D").event_id(related_id).into(),
2523            f,
2524        )
2525        .await;
2526    }
2527
2528    #[async_test]
2529    async fn test_find_event_by_id_with_poll_response_relation() {
2530        let original_id = event_id!("$original");
2531        let related_id = event_id!("$related");
2532        let room_id = room_id!("!galette:saucisse.bzh");
2533        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2534
2535        assert_relations(
2536            room_id,
2537            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2538                .event_id(original_id)
2539                .into(),
2540            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2541            f,
2542        )
2543        .await;
2544    }
2545
2546    #[async_test]
2547    async fn test_find_event_by_id_with_poll_end_relation() {
2548        let original_id = event_id!("$original");
2549        let related_id = event_id!("$related");
2550        let room_id = room_id!("!galette:saucisse.bzh");
2551        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2552
2553        assert_relations(
2554            room_id,
2555            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2556                .event_id(original_id)
2557                .into(),
2558            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2559            f,
2560        )
2561        .await;
2562    }
2563
2564    #[async_test]
2565    async fn test_find_event_by_id_with_filtered_relationships() {
2566        let original_id = event_id!("$original");
2567        let related_id = event_id!("$related");
2568        let associated_related_id = event_id!("$recursive_related");
2569        let room_id = room_id!("!galette:saucisse.bzh");
2570        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2571
2572        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2573        let related_event = event_factory
2574            .text_msg("* Edited event")
2575            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2576            .event_id(related_id)
2577            .into();
2578        let associated_related_event =
2579            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2580
2581        let client = logged_in_client(None).await;
2582
2583        let event_cache = client.event_cache();
2584        event_cache.subscribe().unwrap();
2585
2586        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2587        let room = client.get_room(room_id).unwrap();
2588
2589        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2590
2591        // Save the original event.
2592        room_event_cache.save_events([original_event]).await;
2593
2594        // Save the related event.
2595        room_event_cache.save_events([related_event]).await;
2596
2597        // Save the associated related event, which redacts the related event.
2598        room_event_cache.save_events([associated_related_event]).await;
2599
2600        let filter = Some(vec![RelationType::Replacement]);
2601        let (event, related_events) = room_event_cache
2602            .find_event_with_relations(original_id, filter)
2603            .await
2604            .expect("Failed to find the event with relations")
2605            .expect("Event has no relation");
2606        // Fetched event is the right one.
2607        let cached_event_id = event.event_id().unwrap();
2608        assert_eq!(cached_event_id, original_id);
2609
2610        // There's only the edit event (an edit event can't have its own edit event).
2611        assert_eq!(related_events.len(), 1);
2612
2613        let related_event_id = related_events[0].event_id().unwrap();
2614        assert_eq!(related_event_id, related_id);
2615
2616        // Now we'll filter threads instead, there should be no related events
2617        let filter = Some(vec![RelationType::Thread]);
2618        let (event, related_events) = room_event_cache
2619            .find_event_with_relations(original_id, filter)
2620            .await
2621            .expect("Failed to find the event with relations")
2622            .expect("Event has no relation");
2623
2624        // Fetched event is the right one.
2625        let cached_event_id = event.event_id().unwrap();
2626        assert_eq!(cached_event_id, original_id);
2627        // No Thread related events found
2628        assert!(related_events.is_empty());
2629    }
2630
2631    #[async_test]
2632    async fn test_find_event_by_id_with_recursive_relation() {
2633        let original_id = event_id!("$original");
2634        let related_id = event_id!("$related");
2635        let associated_related_id = event_id!("$recursive_related");
2636        let room_id = room_id!("!galette:saucisse.bzh");
2637        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2638
2639        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2640        let related_event = event_factory
2641            .text_msg("* Edited event")
2642            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2643            .event_id(related_id)
2644            .into();
2645        let associated_related_event =
2646            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2647
2648        let client = logged_in_client(None).await;
2649
2650        let event_cache = client.event_cache();
2651        event_cache.subscribe().unwrap();
2652
2653        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2654        let room = client.get_room(room_id).unwrap();
2655
2656        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2657
2658        // Save the original event.
2659        room_event_cache.save_events([original_event]).await;
2660
2661        // Save the related event.
2662        room_event_cache.save_events([related_event]).await;
2663
2664        // Save the associated related event, which redacts the related event.
2665        room_event_cache.save_events([associated_related_event]).await;
2666
2667        let (event, related_events) = room_event_cache
2668            .find_event_with_relations(original_id, None)
2669            .await
2670            .expect("Failed to find the event with relations")
2671            .expect("Event has no relation");
2672        // Fetched event is the right one.
2673        let cached_event_id = event.event_id().unwrap();
2674        assert_eq!(cached_event_id, original_id);
2675
2676        // There are both the related id and the associatively related id
2677        assert_eq!(related_events.len(), 2);
2678
2679        let related_event_id = related_events[0].event_id().unwrap();
2680        assert_eq!(related_event_id, related_id);
2681        let related_event_id = related_events[1].event_id().unwrap();
2682        assert_eq!(related_event_id, associated_related_id);
2683    }
2684
2685    async fn assert_relations(
2686        room_id: &RoomId,
2687        original_event: Event,
2688        related_event: Event,
2689        event_factory: EventFactory,
2690    ) {
2691        let client = logged_in_client(None).await;
2692
2693        let event_cache = client.event_cache();
2694        event_cache.subscribe().unwrap();
2695
2696        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2697        let room = client.get_room(room_id).unwrap();
2698
2699        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2700
2701        // Save the original event.
2702        let original_event_id = original_event.event_id().unwrap();
2703        room_event_cache.save_events([original_event]).await;
2704
2705        // Save an unrelated event to check it's not in the related events list.
2706        let unrelated_id = event_id!("$2");
2707        room_event_cache
2708            .save_events([event_factory
2709                .text_msg("An unrelated event")
2710                .event_id(unrelated_id)
2711                .into()])
2712            .await;
2713
2714        // Save the related event.
2715        let related_id = related_event.event_id().unwrap();
2716        room_event_cache.save_events([related_event]).await;
2717
2718        let (event, related_events) = room_event_cache
2719            .find_event_with_relations(&original_event_id, None)
2720            .await
2721            .expect("Failed to find the event with relations")
2722            .expect("Event has no relation");
2723        // Fetched event is the right one.
2724        let cached_event_id = event.event_id().unwrap();
2725        assert_eq!(cached_event_id, original_event_id);
2726
2727        // There is only the actually related event in the related ones
2728        let related_event_id = related_events[0].event_id().unwrap();
2729        assert_eq!(related_event_id, related_id);
2730    }
2731}
2732
2733#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
2734mod timed_tests {
2735    use std::{ops::Not, sync::Arc};
2736
2737    use assert_matches::assert_matches;
2738    use assert_matches2::assert_let;
2739    use eyeball_im::VectorDiff;
2740    use futures_util::FutureExt;
2741    use matrix_sdk_base::{
2742        event_cache::{
2743            Gap,
2744            store::{EventCacheStore as _, MemoryStore},
2745        },
2746        linked_chunk::{
2747            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2748            lazy_loader::from_all_chunks,
2749        },
2750        store::StoreConfig,
2751        sync::{JoinedRoomUpdate, Timeline},
2752    };
2753    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2754    use ruma::{
2755        EventId, OwnedUserId, event_id,
2756        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2757        room_id, user_id,
2758    };
2759    use tokio::task::yield_now;
2760
2761    use super::RoomEventCacheGenericUpdate;
2762    use crate::{
2763        assert_let_timeout,
2764        event_cache::{RoomEventCache, RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2765        test_utils::client::MockClientBuilder,
2766    };
2767
2768    #[async_test]
2769    async fn test_write_to_storage() {
2770        let room_id = room_id!("!galette:saucisse.bzh");
2771        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2772
2773        let event_cache_store = Arc::new(MemoryStore::new());
2774
2775        let client = MockClientBuilder::new(None)
2776            .on_builder(|builder| {
2777                builder.store_config(
2778                    StoreConfig::new("hodlor".to_owned())
2779                        .event_cache_store(event_cache_store.clone()),
2780                )
2781            })
2782            .build()
2783            .await;
2784
2785        let event_cache = client.event_cache();
2786
2787        // Don't forget to subscribe and like.
2788        event_cache.subscribe().unwrap();
2789
2790        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2791        let room = client.get_room(room_id).unwrap();
2792
2793        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2794        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2795
2796        // Propagate an update for a message and a prev-batch token.
2797        let timeline = Timeline {
2798            limited: true,
2799            prev_batch: Some("raclette".to_owned()),
2800            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2801        };
2802
2803        room_event_cache
2804            .inner
2805            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2806            .await
2807            .unwrap();
2808
2809        // Just checking the generic update is correct.
2810        assert_matches!(
2811            generic_stream.recv().await,
2812            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2813                assert_eq!(expected_room_id, room_id);
2814            }
2815        );
2816        assert!(generic_stream.is_empty());
2817
2818        // Check the storage.
2819        let linked_chunk = from_all_chunks::<3, _, _>(
2820            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2821        )
2822        .unwrap()
2823        .unwrap();
2824
2825        assert_eq!(linked_chunk.chunks().count(), 2);
2826
2827        let mut chunks = linked_chunk.chunks();
2828
2829        // We start with the gap.
2830        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2831            assert_eq!(gap.prev_token, "raclette");
2832        });
2833
2834        // Then we have the stored event.
2835        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2836            assert_eq!(events.len(), 1);
2837            let deserialized = events[0].raw().deserialize().unwrap();
2838            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2839            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2840        });
2841
2842        // That's all, folks!
2843        assert!(chunks.next().is_none());
2844    }
2845
2846    #[async_test]
2847    async fn test_write_to_storage_strips_bundled_relations() {
2848        let room_id = room_id!("!galette:saucisse.bzh");
2849        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2850
2851        let event_cache_store = Arc::new(MemoryStore::new());
2852
2853        let client = MockClientBuilder::new(None)
2854            .on_builder(|builder| {
2855                builder.store_config(
2856                    StoreConfig::new("hodlor".to_owned())
2857                        .event_cache_store(event_cache_store.clone()),
2858                )
2859            })
2860            .build()
2861            .await;
2862
2863        let event_cache = client.event_cache();
2864
2865        // Don't forget to subscribe and like.
2866        event_cache.subscribe().unwrap();
2867
2868        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2869        let room = client.get_room(room_id).unwrap();
2870
2871        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2872        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2873
2874        // Propagate an update for a message with bundled relations.
2875        let ev = f
2876            .text_msg("hey yo")
2877            .sender(*ALICE)
2878            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2879            .into_event();
2880
2881        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2882
2883        room_event_cache
2884            .inner
2885            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2886            .await
2887            .unwrap();
2888
2889        // Just checking the generic update is correct.
2890        assert_matches!(
2891            generic_stream.recv().await,
2892            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2893                assert_eq!(expected_room_id, room_id);
2894            }
2895        );
2896        assert!(generic_stream.is_empty());
2897
2898        // The in-memory linked chunk keeps the bundled relation.
2899        {
2900            let events = room_event_cache.events().await.unwrap();
2901
2902            assert_eq!(events.len(), 1);
2903
2904            let ev = events[0].raw().deserialize().unwrap();
2905            assert_let!(
2906                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2907            );
2908
2909            let original = msg.as_original().unwrap();
2910            assert_eq!(original.content.body(), "hey yo");
2911            assert!(original.unsigned.relations.replace.is_some());
2912        }
2913
2914        // The one in storage does not.
2915        let linked_chunk = from_all_chunks::<3, _, _>(
2916            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2917        )
2918        .unwrap()
2919        .unwrap();
2920
2921        assert_eq!(linked_chunk.chunks().count(), 1);
2922
2923        let mut chunks = linked_chunk.chunks();
2924        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2925            assert_eq!(events.len(), 1);
2926
2927            let ev = events[0].raw().deserialize().unwrap();
2928            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2929
2930            let original = msg.as_original().unwrap();
2931            assert_eq!(original.content.body(), "hey yo");
2932            assert!(original.unsigned.relations.replace.is_none());
2933        });
2934
2935        // That's all, folks!
2936        assert!(chunks.next().is_none());
2937    }
2938
2939    #[async_test]
2940    async fn test_clear() {
2941        let room_id = room_id!("!galette:saucisse.bzh");
2942        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2943
2944        let event_cache_store = Arc::new(MemoryStore::new());
2945
2946        let event_id1 = event_id!("$1");
2947        let event_id2 = event_id!("$2");
2948
2949        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2950        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2951
2952        // Prefill the store with some data.
2953        event_cache_store
2954            .handle_linked_chunk_updates(
2955                LinkedChunkId::Room(room_id),
2956                vec![
2957                    // An empty items chunk.
2958                    Update::NewItemsChunk {
2959                        previous: None,
2960                        new: ChunkIdentifier::new(0),
2961                        next: None,
2962                    },
2963                    // A gap chunk.
2964                    Update::NewGapChunk {
2965                        previous: Some(ChunkIdentifier::new(0)),
2966                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2967                        new: ChunkIdentifier::new(42),
2968                        next: None,
2969                        gap: Gap { prev_token: "comté".to_owned() },
2970                    },
2971                    // Another items chunk, non-empty this time.
2972                    Update::NewItemsChunk {
2973                        previous: Some(ChunkIdentifier::new(42)),
2974                        new: ChunkIdentifier::new(1),
2975                        next: None,
2976                    },
2977                    Update::PushItems {
2978                        at: Position::new(ChunkIdentifier::new(1), 0),
2979                        items: vec![ev1.clone()],
2980                    },
2981                    // And another items chunk, non-empty again.
2982                    Update::NewItemsChunk {
2983                        previous: Some(ChunkIdentifier::new(1)),
2984                        new: ChunkIdentifier::new(2),
2985                        next: None,
2986                    },
2987                    Update::PushItems {
2988                        at: Position::new(ChunkIdentifier::new(2), 0),
2989                        items: vec![ev2.clone()],
2990                    },
2991                ],
2992            )
2993            .await
2994            .unwrap();
2995
2996        let client = MockClientBuilder::new(None)
2997            .on_builder(|builder| {
2998                builder.store_config(
2999                    StoreConfig::new("hodlor".to_owned())
3000                        .event_cache_store(event_cache_store.clone()),
3001                )
3002            })
3003            .build()
3004            .await;
3005
3006        let event_cache = client.event_cache();
3007
3008        // Don't forget to subscribe and like.
3009        event_cache.subscribe().unwrap();
3010
3011        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3012        let room = client.get_room(room_id).unwrap();
3013
3014        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3015
3016        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3017        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3018
3019        // The rooms knows about all cached events.
3020        {
3021            assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3022            assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3023        }
3024
3025        // But only part of events are loaded from the store
3026        {
3027            // The room must contain only one event because only one chunk has been loaded.
3028            assert_eq!(items.len(), 1);
3029            assert_eq!(items[0].event_id().unwrap(), event_id2);
3030
3031            assert!(stream.is_empty());
3032        }
3033
3034        // Let's load more chunks to load all events.
3035        {
3036            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3037
3038            assert_let_timeout!(
3039                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3040            );
3041            assert_eq!(diffs.len(), 1);
3042            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3043                // Here you are `event_id1`!
3044                assert_eq!(event.event_id().unwrap(), event_id1);
3045            });
3046
3047            assert!(stream.is_empty());
3048
3049            assert_let_timeout!(
3050                Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
3051                    generic_stream.recv()
3052            );
3053            assert_eq!(room_id, expected_room_id);
3054            assert!(generic_stream.is_empty());
3055        }
3056
3057        // After clearing,…
3058        room_event_cache.clear().await.unwrap();
3059
3060        //… we get an update that the content has been cleared.
3061        assert_let_timeout!(
3062            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3063        );
3064        assert_eq!(diffs.len(), 1);
3065        assert_let!(VectorDiff::Clear = &diffs[0]);
3066
3067        // … same with a generic update.
3068        assert_let_timeout!(
3069            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3070        );
3071        assert_eq!(received_room_id, room_id);
3072        assert!(generic_stream.is_empty());
3073
3074        // Events individually are not forgotten by the event cache, after clearing a
3075        // room.
3076        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3077
3078        // But their presence in a linked chunk is forgotten.
3079        let items = room_event_cache.events().await.unwrap();
3080        assert!(items.is_empty());
3081
3082        // The event cache store too.
3083        let linked_chunk = from_all_chunks::<3, _, _>(
3084            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
3085        )
3086        .unwrap()
3087        .unwrap();
3088
3089        // Note: while the event cache store could return `None` here, clearing it will
3090        // reset it to its initial form, maintaining the invariant that it
3091        // contains a single items chunk that's empty.
3092        assert_eq!(linked_chunk.num_items(), 0);
3093    }
3094
3095    #[async_test]
3096    async fn test_load_from_storage() {
3097        let room_id = room_id!("!galette:saucisse.bzh");
3098        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
3099
3100        let event_cache_store = Arc::new(MemoryStore::new());
3101
3102        let event_id1 = event_id!("$1");
3103        let event_id2 = event_id!("$2");
3104
3105        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3106        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3107
3108        // Prefill the store with some data.
3109        event_cache_store
3110            .handle_linked_chunk_updates(
3111                LinkedChunkId::Room(room_id),
3112                vec![
3113                    // An empty items chunk.
3114                    Update::NewItemsChunk {
3115                        previous: None,
3116                        new: ChunkIdentifier::new(0),
3117                        next: None,
3118                    },
3119                    // A gap chunk.
3120                    Update::NewGapChunk {
3121                        previous: Some(ChunkIdentifier::new(0)),
3122                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
3123                        new: ChunkIdentifier::new(42),
3124                        next: None,
3125                        gap: Gap { prev_token: "cheddar".to_owned() },
3126                    },
3127                    // Another items chunk, non-empty this time.
3128                    Update::NewItemsChunk {
3129                        previous: Some(ChunkIdentifier::new(42)),
3130                        new: ChunkIdentifier::new(1),
3131                        next: None,
3132                    },
3133                    Update::PushItems {
3134                        at: Position::new(ChunkIdentifier::new(1), 0),
3135                        items: vec![ev1.clone()],
3136                    },
3137                    // And another items chunk, non-empty again.
3138                    Update::NewItemsChunk {
3139                        previous: Some(ChunkIdentifier::new(1)),
3140                        new: ChunkIdentifier::new(2),
3141                        next: None,
3142                    },
3143                    Update::PushItems {
3144                        at: Position::new(ChunkIdentifier::new(2), 0),
3145                        items: vec![ev2.clone()],
3146                    },
3147                ],
3148            )
3149            .await
3150            .unwrap();
3151
3152        let client = MockClientBuilder::new(None)
3153            .on_builder(|builder| {
3154                builder.store_config(
3155                    StoreConfig::new("hodlor".to_owned())
3156                        .event_cache_store(event_cache_store.clone()),
3157                )
3158            })
3159            .build()
3160            .await;
3161
3162        let event_cache = client.event_cache();
3163
3164        // Don't forget to subscribe and like.
3165        event_cache.subscribe().unwrap();
3166
3167        // Let's check whether the generic updates are received for the initialisation.
3168        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3169
3170        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3171        let room = client.get_room(room_id).unwrap();
3172
3173        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3174
3175        // The room event cache has been loaded. A generic update must have been
3176        // triggered.
3177        assert_matches!(
3178            generic_stream.recv().await,
3179            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3180                assert_eq!(room_id, expected_room_id);
3181            }
3182        );
3183        assert!(generic_stream.is_empty());
3184
3185        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3186
3187        // The initial items contain one event because only the last chunk is loaded by
3188        // default.
3189        assert_eq!(items.len(), 1);
3190        assert_eq!(items[0].event_id().unwrap(), event_id2);
3191        assert!(stream.is_empty());
3192
3193        // The event cache knows only all events though, even if they aren't loaded.
3194        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3195        assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3196
3197        // Let's paginate to load more events.
3198        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3199
3200        assert_let_timeout!(
3201            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3202        );
3203        assert_eq!(diffs.len(), 1);
3204        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3205            assert_eq!(event.event_id().unwrap(), event_id1);
3206        });
3207
3208        assert!(stream.is_empty());
3209
3210        // A generic update is triggered too.
3211        assert_matches!(
3212            generic_stream.recv().await,
3213            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3214                assert_eq!(expected_room_id, room_id);
3215            }
3216        );
3217        assert!(generic_stream.is_empty());
3218
3219        // A new update with one of these events leads to deduplication.
3220        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
3221
3222        room_event_cache
3223            .inner
3224            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
3225            .await
3226            .unwrap();
3227
3228        // Just checking the generic update is correct. There is a duplicate event, so
3229        // no generic changes whatsoever!
3230        assert!(generic_stream.recv().now_or_never().is_none());
3231
3232        // The stream doesn't report these changes *yet*. Use the items vector given
3233        // when subscribing, to check that the items correspond to their new
3234        // positions. The duplicated item is removed (so it's not the first
3235        // element anymore), and it's added to the back of the list.
3236        let items = room_event_cache.events().await.unwrap();
3237        assert_eq!(items.len(), 2);
3238        assert_eq!(items[0].event_id().unwrap(), event_id1);
3239        assert_eq!(items[1].event_id().unwrap(), event_id2);
3240    }
3241
3242    #[async_test]
3243    async fn test_load_from_storage_resilient_to_failure() {
3244        let room_id = room_id!("!fondue:patate.ch");
3245        let event_cache_store = Arc::new(MemoryStore::new());
3246
3247        let event = EventFactory::new()
3248            .room(room_id)
3249            .sender(user_id!("@ben:saucisse.bzh"))
3250            .text_msg("foo")
3251            .event_id(event_id!("$42"))
3252            .into_event();
3253
3254        // Prefill the store with invalid data: two chunks that form a cycle.
3255        event_cache_store
3256            .handle_linked_chunk_updates(
3257                LinkedChunkId::Room(room_id),
3258                vec![
3259                    Update::NewItemsChunk {
3260                        previous: None,
3261                        new: ChunkIdentifier::new(0),
3262                        next: None,
3263                    },
3264                    Update::PushItems {
3265                        at: Position::new(ChunkIdentifier::new(0), 0),
3266                        items: vec![event],
3267                    },
3268                    Update::NewItemsChunk {
3269                        previous: Some(ChunkIdentifier::new(0)),
3270                        new: ChunkIdentifier::new(1),
3271                        next: Some(ChunkIdentifier::new(0)),
3272                    },
3273                ],
3274            )
3275            .await
3276            .unwrap();
3277
3278        let client = MockClientBuilder::new(None)
3279            .on_builder(|builder| {
3280                builder.store_config(
3281                    StoreConfig::new("holder".to_owned())
3282                        .event_cache_store(event_cache_store.clone()),
3283                )
3284            })
3285            .build()
3286            .await;
3287
3288        let event_cache = client.event_cache();
3289
3290        // Don't forget to subscribe and like.
3291        event_cache.subscribe().unwrap();
3292
3293        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3294        let room = client.get_room(room_id).unwrap();
3295
3296        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3297
3298        let items = room_event_cache.events().await.unwrap();
3299
3300        // Because the persisted content was invalid, the room store is reset: there are
3301        // no events in the cache.
3302        assert!(items.is_empty());
3303
3304        // Storage doesn't contain anything. It would also be valid that it contains a
3305        // single initial empty items chunk.
3306        let raw_chunks =
3307            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
3308        assert!(raw_chunks.is_empty());
3309    }
3310
3311    #[async_test]
3312    async fn test_no_useless_gaps() {
3313        let room_id = room_id!("!galette:saucisse.bzh");
3314
3315        let client = MockClientBuilder::new(None).build().await;
3316
3317        let event_cache = client.event_cache();
3318        event_cache.subscribe().unwrap();
3319
3320        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3321        let room = client.get_room(room_id).unwrap();
3322        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3323        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3324
3325        let f = EventFactory::new().room(room_id).sender(*ALICE);
3326
3327        // Propagate an update including a limited timeline with one message and a
3328        // prev-batch token.
3329        room_event_cache
3330            .inner
3331            .handle_joined_room_update(JoinedRoomUpdate {
3332                timeline: Timeline {
3333                    limited: true,
3334                    prev_batch: Some("raclette".to_owned()),
3335                    events: vec![f.text_msg("hey yo").into_event()],
3336                },
3337                ..Default::default()
3338            })
3339            .await
3340            .unwrap();
3341
3342        // Just checking the generic update is correct.
3343        assert_matches!(
3344            generic_stream.recv().await,
3345            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3346                assert_eq!(expected_room_id, room_id);
3347            }
3348        );
3349        assert!(generic_stream.is_empty());
3350
3351        {
3352            let mut state = room_event_cache.inner.state.write().await.unwrap();
3353
3354            let mut num_gaps = 0;
3355            let mut num_events = 0;
3356
3357            for c in state.room_linked_chunk().chunks() {
3358                match c.content() {
3359                    ChunkContent::Items(items) => num_events += items.len(),
3360                    ChunkContent::Gap(_) => num_gaps += 1,
3361                }
3362            }
3363
3364            // The limited sync unloads the chunk, so it will appear as if there are only
3365            // the events.
3366            assert_eq!(num_gaps, 0);
3367            assert_eq!(num_events, 1);
3368
3369            // But if I manually reload more of the chunk, the gap will be present.
3370            assert_matches!(
3371                state.load_more_events_backwards().await.unwrap(),
3372                LoadMoreEventsBackwardsOutcome::Gap { .. }
3373            );
3374
3375            num_gaps = 0;
3376            num_events = 0;
3377            for c in state.room_linked_chunk().chunks() {
3378                match c.content() {
3379                    ChunkContent::Items(items) => num_events += items.len(),
3380                    ChunkContent::Gap(_) => num_gaps += 1,
3381                }
3382            }
3383
3384            // The gap must have been stored.
3385            assert_eq!(num_gaps, 1);
3386            assert_eq!(num_events, 1);
3387        }
3388
3389        // Now, propagate an update for another message, but the timeline isn't limited
3390        // this time.
3391        room_event_cache
3392            .inner
3393            .handle_joined_room_update(JoinedRoomUpdate {
3394                timeline: Timeline {
3395                    limited: false,
3396                    prev_batch: Some("fondue".to_owned()),
3397                    events: vec![f.text_msg("sup").into_event()],
3398                },
3399                ..Default::default()
3400            })
3401            .await
3402            .unwrap();
3403
3404        // Just checking the generic update is correct.
3405        assert_matches!(
3406            generic_stream.recv().await,
3407            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3408                assert_eq!(expected_room_id, room_id);
3409            }
3410        );
3411        assert!(generic_stream.is_empty());
3412
3413        {
3414            let state = room_event_cache.inner.state.read().await.unwrap();
3415
3416            let mut num_gaps = 0;
3417            let mut num_events = 0;
3418
3419            for c in state.room_linked_chunk().chunks() {
3420                match c.content() {
3421                    ChunkContent::Items(items) => num_events += items.len(),
3422                    ChunkContent::Gap(gap) => {
3423                        assert_eq!(gap.prev_token, "raclette");
3424                        num_gaps += 1;
3425                    }
3426                }
3427            }
3428
3429            // There's only the previous gap, no new ones.
3430            assert_eq!(num_gaps, 1);
3431            assert_eq!(num_events, 2);
3432        }
3433    }
3434
3435    #[async_test]
3436    async fn test_shrink_to_last_chunk() {
3437        let room_id = room_id!("!galette:saucisse.bzh");
3438
3439        let client = MockClientBuilder::new(None).build().await;
3440
3441        let f = EventFactory::new().room(room_id);
3442
3443        let evid1 = event_id!("$1");
3444        let evid2 = event_id!("$2");
3445
3446        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3447        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3448
3449        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3450        {
3451            client
3452                .event_cache_store()
3453                .lock()
3454                .await
3455                .expect("Could not acquire the event cache lock")
3456                .as_clean()
3457                .expect("Could not acquire a clean event cache lock")
3458                .handle_linked_chunk_updates(
3459                    LinkedChunkId::Room(room_id),
3460                    vec![
3461                        Update::NewItemsChunk {
3462                            previous: None,
3463                            new: ChunkIdentifier::new(0),
3464                            next: None,
3465                        },
3466                        Update::PushItems {
3467                            at: Position::new(ChunkIdentifier::new(0), 0),
3468                            items: vec![ev1],
3469                        },
3470                        Update::NewItemsChunk {
3471                            previous: Some(ChunkIdentifier::new(0)),
3472                            new: ChunkIdentifier::new(1),
3473                            next: None,
3474                        },
3475                        Update::PushItems {
3476                            at: Position::new(ChunkIdentifier::new(1), 0),
3477                            items: vec![ev2],
3478                        },
3479                    ],
3480                )
3481                .await
3482                .unwrap();
3483        }
3484
3485        let event_cache = client.event_cache();
3486        event_cache.subscribe().unwrap();
3487
3488        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3489        let room = client.get_room(room_id).unwrap();
3490        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3491
3492        // Sanity check: lazily loaded, so only includes one item at start.
3493        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
3494        assert_eq!(events.len(), 1);
3495        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3496        assert!(stream.is_empty());
3497
3498        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3499
3500        // Force loading the full linked chunk by back-paginating.
3501        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3502        assert_eq!(outcome.events.len(), 1);
3503        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3504        assert!(outcome.reached_start);
3505
3506        // We also get an update about the loading from the store.
3507        assert_let_timeout!(
3508            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3509        );
3510        assert_eq!(diffs.len(), 1);
3511        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3512            assert_eq!(value.event_id().as_deref(), Some(evid1));
3513        });
3514
3515        assert!(stream.is_empty());
3516
3517        // Same for the generic update.
3518        assert_let_timeout!(
3519            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
3520        );
3521        assert_eq!(expected_room_id, room_id);
3522        assert!(generic_stream.is_empty());
3523
3524        // Shrink the linked chunk to the last chunk.
3525        let diffs = room_event_cache
3526            .inner
3527            .state
3528            .write()
3529            .await
3530            .unwrap()
3531            .force_shrink_to_last_chunk()
3532            .await
3533            .expect("shrinking should succeed");
3534
3535        // We receive updates about the changes to the linked chunk.
3536        assert_eq!(diffs.len(), 2);
3537        assert_matches!(&diffs[0], VectorDiff::Clear);
3538        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3539            assert_eq!(values.len(), 1);
3540            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3541        });
3542
3543        assert!(stream.is_empty());
3544
3545        // No generic update is sent in this case.
3546        assert!(generic_stream.is_empty());
3547
3548        // When reading the events, we do get only the last one.
3549        let events = room_event_cache.events().await.unwrap();
3550        assert_eq!(events.len(), 1);
3551        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3552
3553        // But if we back-paginate, we don't need access to network to find out about
3554        // the previous event.
3555        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3556        assert_eq!(outcome.events.len(), 1);
3557        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3558        assert!(outcome.reached_start);
3559    }
3560
3561    #[async_test]
3562    async fn test_room_ordering() {
3563        let room_id = room_id!("!galette:saucisse.bzh");
3564
3565        let client = MockClientBuilder::new(None).build().await;
3566
3567        let f = EventFactory::new().room(room_id).sender(*ALICE);
3568
3569        let evid1 = event_id!("$1");
3570        let evid2 = event_id!("$2");
3571        let evid3 = event_id!("$3");
3572
3573        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3574        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3575        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3576
3577        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3578        {
3579            client
3580                .event_cache_store()
3581                .lock()
3582                .await
3583                .expect("Could not acquire the event cache lock")
3584                .as_clean()
3585                .expect("Could not acquire a clean event cache lock")
3586                .handle_linked_chunk_updates(
3587                    LinkedChunkId::Room(room_id),
3588                    vec![
3589                        Update::NewItemsChunk {
3590                            previous: None,
3591                            new: ChunkIdentifier::new(0),
3592                            next: None,
3593                        },
3594                        Update::PushItems {
3595                            at: Position::new(ChunkIdentifier::new(0), 0),
3596                            items: vec![ev1, ev2],
3597                        },
3598                        Update::NewItemsChunk {
3599                            previous: Some(ChunkIdentifier::new(0)),
3600                            new: ChunkIdentifier::new(1),
3601                            next: None,
3602                        },
3603                        Update::PushItems {
3604                            at: Position::new(ChunkIdentifier::new(1), 0),
3605                            items: vec![ev3.clone()],
3606                        },
3607                    ],
3608                )
3609                .await
3610                .unwrap();
3611        }
3612
3613        let event_cache = client.event_cache();
3614        event_cache.subscribe().unwrap();
3615
3616        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3617        let room = client.get_room(room_id).unwrap();
3618        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3619
3620        // Initially, the linked chunk only contains the last chunk, so only ev3 is
3621        // loaded.
3622        {
3623            let state = room_event_cache.inner.state.read().await.unwrap();
3624            let room_linked_chunk = state.room_linked_chunk();
3625
3626            // But we can get the order of ev1.
3627            assert_eq!(
3628                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3629                Some(0)
3630            );
3631
3632            // And that of ev2 as well.
3633            assert_eq!(
3634                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3635                Some(1)
3636            );
3637
3638            // ev3, which is loaded, also has a known ordering.
3639            let mut events = room_linked_chunk.events();
3640            let (pos, ev) = events.next().unwrap();
3641            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3642            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3643            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3644
3645            // No other loaded events.
3646            assert!(events.next().is_none());
3647        }
3648
3649        // Force loading the full linked chunk by back-paginating.
3650        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3651        assert!(outcome.reached_start);
3652
3653        // All events are now loaded, so their order is precisely their enumerated index
3654        // in a linear iteration.
3655        {
3656            let state = room_event_cache.inner.state.read().await.unwrap();
3657            let room_linked_chunk = state.room_linked_chunk();
3658
3659            for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
3660                assert_eq!(room_linked_chunk.event_order(pos), Some(i));
3661            }
3662        }
3663
3664        // Handle a gappy sync with two events (including one duplicate, so
3665        // deduplication kicks in), so that the linked chunk is shrunk to the
3666        // last chunk, and that the linked chunk only contains the last two
3667        // events.
3668        let evid4 = event_id!("$4");
3669        room_event_cache
3670            .inner
3671            .handle_joined_room_update(JoinedRoomUpdate {
3672                timeline: Timeline {
3673                    limited: true,
3674                    prev_batch: Some("fondue".to_owned()),
3675                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3676                },
3677                ..Default::default()
3678            })
3679            .await
3680            .unwrap();
3681
3682        {
3683            let state = room_event_cache.inner.state.read().await.unwrap();
3684            let room_linked_chunk = state.room_linked_chunk();
3685
3686            // After the shrink, only evid3 and evid4 are loaded.
3687            let mut events = room_linked_chunk.events();
3688
3689            let (pos, ev) = events.next().unwrap();
3690            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3691            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3692
3693            let (pos, ev) = events.next().unwrap();
3694            assert_eq!(ev.event_id().as_deref(), Some(evid4));
3695            assert_eq!(room_linked_chunk.event_order(pos), Some(3));
3696
3697            // No other loaded events.
3698            assert!(events.next().is_none());
3699
3700            // But we can still get the order of previous events.
3701            assert_eq!(
3702                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3703                Some(0)
3704            );
3705            assert_eq!(
3706                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3707                Some(1)
3708            );
3709
3710            // ev3 doesn't have an order with its previous position, since it's been
3711            // deduplicated.
3712            assert_eq!(
3713                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
3714                None
3715            );
3716        }
3717    }
3718
3719    #[async_test]
3720    async fn test_auto_shrink_after_all_subscribers_are_gone() {
3721        let room_id = room_id!("!galette:saucisse.bzh");
3722
3723        let client = MockClientBuilder::new(None).build().await;
3724
3725        let f = EventFactory::new().room(room_id);
3726
3727        let evid1 = event_id!("$1");
3728        let evid2 = event_id!("$2");
3729
3730        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3731        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3732
3733        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3734        {
3735            client
3736                .event_cache_store()
3737                .lock()
3738                .await
3739                .expect("Could not acquire the event cache lock")
3740                .as_clean()
3741                .expect("Could not acquire a clean event cache lock")
3742                .handle_linked_chunk_updates(
3743                    LinkedChunkId::Room(room_id),
3744                    vec![
3745                        Update::NewItemsChunk {
3746                            previous: None,
3747                            new: ChunkIdentifier::new(0),
3748                            next: None,
3749                        },
3750                        Update::PushItems {
3751                            at: Position::new(ChunkIdentifier::new(0), 0),
3752                            items: vec![ev1],
3753                        },
3754                        Update::NewItemsChunk {
3755                            previous: Some(ChunkIdentifier::new(0)),
3756                            new: ChunkIdentifier::new(1),
3757                            next: None,
3758                        },
3759                        Update::PushItems {
3760                            at: Position::new(ChunkIdentifier::new(1), 0),
3761                            items: vec![ev2],
3762                        },
3763                    ],
3764                )
3765                .await
3766                .unwrap();
3767        }
3768
3769        let event_cache = client.event_cache();
3770        event_cache.subscribe().unwrap();
3771
3772        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3773        let room = client.get_room(room_id).unwrap();
3774        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3775
3776        // Sanity check: lazily loaded, so only includes one item at start.
3777        let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
3778        assert_eq!(events1.len(), 1);
3779        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3780        assert!(stream1.is_empty());
3781
3782        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3783
3784        // Force loading the full linked chunk by back-paginating.
3785        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3786        assert_eq!(outcome.events.len(), 1);
3787        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3788        assert!(outcome.reached_start);
3789
3790        // We also get an update about the loading from the store. Ignore it, for this
3791        // test's sake.
3792        assert_let_timeout!(
3793            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3794        );
3795        assert_eq!(diffs.len(), 1);
3796        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3797            assert_eq!(value.event_id().as_deref(), Some(evid1));
3798        });
3799
3800        assert!(stream1.is_empty());
3801
3802        assert_let_timeout!(
3803            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
3804        );
3805        assert_eq!(expected_room_id, room_id);
3806        assert!(generic_stream.is_empty());
3807
3808        // Have another subscriber.
3809        // Since it's not the first one, and the previous one loaded some more events,
3810        // the second subscribers sees them all.
3811        let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
3812        assert_eq!(events2.len(), 2);
3813        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3814        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3815        assert!(stream2.is_empty());
3816
3817        // Drop the first stream, and wait a bit.
3818        drop(stream1);
3819        yield_now().await;
3820
3821        // The second stream remains undisturbed.
3822        assert!(stream2.is_empty());
3823
3824        // Now drop the second stream, and wait a bit.
3825        drop(stream2);
3826        yield_now().await;
3827
3828        // The linked chunk must have auto-shrunk by now.
3829
3830        {
3831            // Check the inner state: there's no more shared auto-shrinker.
3832            let state = room_event_cache.inner.state.read().await.unwrap();
3833            assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
3834        }
3835
3836        // Getting the events will only give us the latest chunk.
3837        let events3 = room_event_cache.events().await.unwrap();
3838        assert_eq!(events3.len(), 1);
3839        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3840    }
3841
3842    #[async_test]
3843    async fn test_rfind_map_event_in_memory_by() {
3844        let user_id = user_id!("@mnt_io:matrix.org");
3845        let room_id = room_id!("!raclette:patate.ch");
3846        let client = MockClientBuilder::new(None).build().await;
3847
3848        let event_factory = EventFactory::new().room(room_id);
3849
3850        let event_id_0 = event_id!("$ev0");
3851        let event_id_1 = event_id!("$ev1");
3852        let event_id_2 = event_id!("$ev2");
3853        let event_id_3 = event_id!("$ev3");
3854
3855        let event_0 =
3856            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3857        let event_1 =
3858            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3859        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3860        let event_3 =
3861            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3862
3863        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
3864        // events.
3865        {
3866            client
3867                .event_cache_store()
3868                .lock()
3869                .await
3870                .expect("Could not acquire the event cache lock")
3871                .as_clean()
3872                .expect("Could not acquire a clean event cache lock")
3873                .handle_linked_chunk_updates(
3874                    LinkedChunkId::Room(room_id),
3875                    vec![
3876                        Update::NewItemsChunk {
3877                            previous: None,
3878                            new: ChunkIdentifier::new(0),
3879                            next: None,
3880                        },
3881                        Update::PushItems {
3882                            at: Position::new(ChunkIdentifier::new(0), 0),
3883                            items: vec![event_3],
3884                        },
3885                        Update::NewItemsChunk {
3886                            previous: Some(ChunkIdentifier::new(0)),
3887                            new: ChunkIdentifier::new(1),
3888                            next: None,
3889                        },
3890                        Update::PushItems {
3891                            at: Position::new(ChunkIdentifier::new(1), 0),
3892                            items: vec![event_0, event_1, event_2],
3893                        },
3894                    ],
3895                )
3896                .await
3897                .unwrap();
3898        }
3899
3900        let event_cache = client.event_cache();
3901        event_cache.subscribe().unwrap();
3902
3903        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3904        let room = client.get_room(room_id).unwrap();
3905        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3906
3907        // Look for an event from `BOB`: it must be `event_0`.
3908        assert_matches!(
3909            room_event_cache
3910                .rfind_map_event_in_memory_by(|event, previous_event| {
3911                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| (event.event_id(), previous_event.and_then(|event| event.event_id())))
3912                })
3913                .await,
3914            Ok(Some((event_id, previous_event_id))) => {
3915                assert_eq!(event_id.as_deref(), Some(event_id_0));
3916                assert!(previous_event_id.is_none());
3917            }
3918        );
3919
3920        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
3921        // because events are looked for in reverse order.
3922        assert_matches!(
3923            room_event_cache
3924                .rfind_map_event_in_memory_by(|event, previous_event| {
3925                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| (event.event_id(), previous_event.and_then(|event| event.event_id())))
3926                })
3927                .await,
3928            Ok(Some((event_id, previous_event_id))) => {
3929                assert_eq!(event_id.as_deref(), Some(event_id_2));
3930                assert_eq!(previous_event_id.as_deref(), Some(event_id_1));
3931            }
3932        );
3933
3934        // Look for an event that is inside the storage, but not loaded.
3935        assert!(
3936            room_event_cache
3937                .rfind_map_event_in_memory_by(|event, _| {
3938                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3939                        == Some(user_id))
3940                    .then(|| event.event_id())
3941                })
3942                .await
3943                .unwrap()
3944                .is_none()
3945        );
3946
3947        // Look for an event that doesn't exist.
3948        assert!(
3949            room_event_cache
3950                .rfind_map_event_in_memory_by(|_, _| None::<()>)
3951                .await
3952                .unwrap()
3953                .is_none()
3954        );
3955    }
3956
3957    #[async_test]
3958    async fn test_reload_when_dirty() {
3959        let user_id = user_id!("@mnt_io:matrix.org");
3960        let room_id = room_id!("!raclette:patate.ch");
3961
3962        // The storage shared by the two clients.
3963        let event_cache_store = MemoryStore::new();
3964
3965        // Client for the process 0.
3966        let client_p0 = MockClientBuilder::new(None)
3967            .on_builder(|builder| {
3968                builder.store_config(
3969                    StoreConfig::new("process #0".to_owned())
3970                        .event_cache_store(event_cache_store.clone()),
3971                )
3972            })
3973            .build()
3974            .await;
3975
3976        // Client for the process 1.
3977        let client_p1 = MockClientBuilder::new(None)
3978            .on_builder(|builder| {
3979                builder.store_config(
3980                    StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
3981                )
3982            })
3983            .build()
3984            .await;
3985
3986        let event_factory = EventFactory::new().room(room_id).sender(user_id);
3987
3988        let ev_id_0 = event_id!("$ev_0");
3989        let ev_id_1 = event_id!("$ev_1");
3990
3991        let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
3992        let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
3993
3994        // Add events to the storage (shared by the two clients!).
3995        client_p0
3996            .event_cache_store()
3997            .lock()
3998            .await
3999            .expect("[p0] Could not acquire the event cache lock")
4000            .as_clean()
4001            .expect("[p0] Could not acquire a clean event cache lock")
4002            .handle_linked_chunk_updates(
4003                LinkedChunkId::Room(room_id),
4004                vec![
4005                    Update::NewItemsChunk {
4006                        previous: None,
4007                        new: ChunkIdentifier::new(0),
4008                        next: None,
4009                    },
4010                    Update::PushItems {
4011                        at: Position::new(ChunkIdentifier::new(0), 0),
4012                        items: vec![ev_0],
4013                    },
4014                    Update::NewItemsChunk {
4015                        previous: Some(ChunkIdentifier::new(0)),
4016                        new: ChunkIdentifier::new(1),
4017                        next: None,
4018                    },
4019                    Update::PushItems {
4020                        at: Position::new(ChunkIdentifier::new(1), 0),
4021                        items: vec![ev_1],
4022                    },
4023                ],
4024            )
4025            .await
4026            .unwrap();
4027
4028        // Subscribe the event caches, and create the room.
4029        let (room_event_cache_p0, room_event_cache_p1) = {
4030            let event_cache_p0 = client_p0.event_cache();
4031            event_cache_p0.subscribe().unwrap();
4032
4033            let event_cache_p1 = client_p1.event_cache();
4034            event_cache_p1.subscribe().unwrap();
4035
4036            client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
4037            client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
4038
4039            let (room_event_cache_p0, _drop_handles) =
4040                client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
4041            let (room_event_cache_p1, _drop_handles) =
4042                client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
4043
4044            (room_event_cache_p0, room_event_cache_p1)
4045        };
4046
4047        // Okay. We are ready for the test!
4048        //
4049        // First off, let's check `room_event_cache_p0` has access to the first event
4050        // loaded in-memory, then do a pagination, and see more events.
4051        let mut updates_stream_p0 = {
4052            let room_event_cache = &room_event_cache_p0;
4053
4054            let (initial_updates, mut updates_stream) =
4055                room_event_cache_p0.subscribe().await.unwrap();
4056
4057            // Initial updates contain `ev_id_1` only.
4058            assert_eq!(initial_updates.len(), 1);
4059            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4060            assert!(updates_stream.is_empty());
4061
4062            // `ev_id_1` must be loaded in memory.
4063            assert!(event_loaded(room_event_cache, ev_id_1).await);
4064
4065            // `ev_id_0` must NOT be loaded in memory.
4066            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4067
4068            // Load one more event with a backpagination.
4069            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4070
4071            // A new update for `ev_id_0` must be present.
4072            assert_matches!(
4073                updates_stream.recv().await.unwrap(),
4074                RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4075                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
4076                    assert_matches!(
4077                        &diffs[0],
4078                        VectorDiff::Insert { index: 0, value: event } => {
4079                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4080                        }
4081                    );
4082                }
4083            );
4084
4085            // `ev_id_0` must now be loaded in memory.
4086            assert!(event_loaded(room_event_cache, ev_id_0).await);
4087
4088            updates_stream
4089        };
4090
4091        // Second, let's check `room_event_cache_p1` has the same accesses.
4092        let mut updates_stream_p1 = {
4093            let room_event_cache = &room_event_cache_p1;
4094            let (initial_updates, mut updates_stream) =
4095                room_event_cache_p1.subscribe().await.unwrap();
4096
4097            // Initial updates contain `ev_id_1` only.
4098            assert_eq!(initial_updates.len(), 1);
4099            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4100            assert!(updates_stream.is_empty());
4101
4102            // `ev_id_1` must be loaded in memory.
4103            assert!(event_loaded(room_event_cache, ev_id_1).await);
4104
4105            // `ev_id_0` must NOT be loaded in memory.
4106            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4107
4108            // Load one more event with a backpagination.
4109            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4110
4111            // A new update for `ev_id_0` must be present.
4112            assert_matches!(
4113                updates_stream.recv().await.unwrap(),
4114                RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4115                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
4116                    assert_matches!(
4117                        &diffs[0],
4118                        VectorDiff::Insert { index: 0, value: event } => {
4119                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4120                        }
4121                    );
4122                }
4123            );
4124
4125            // `ev_id_0` must now be loaded in memory.
4126            assert!(event_loaded(room_event_cache, ev_id_0).await);
4127
4128            updates_stream
4129        };
4130
4131        // Do this a couple times, for the fun.
4132        for _ in 0..3 {
4133            // Third, because `room_event_cache_p1` has locked the store, the lock
4134            // is dirty for `room_event_cache_p0`, so it will shrink to its last
4135            // chunk!
4136            {
4137                let room_event_cache = &room_event_cache_p0;
4138                let updates_stream = &mut updates_stream_p0;
4139
4140                // `ev_id_1` must be loaded in memory, just like before.
4141                assert!(event_loaded(room_event_cache, ev_id_1).await);
4142
4143                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
4144                // state has been reloaded to its last chunk.
4145                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4146
4147                // The reload can be observed via the updates too.
4148                assert_matches!(
4149                    updates_stream.recv().await.unwrap(),
4150                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4151                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4152                        assert_matches!(&diffs[0], VectorDiff::Clear);
4153                        assert_matches!(
4154                            &diffs[1],
4155                            VectorDiff::Append { values: events } => {
4156                                assert_eq!(events.len(), 1);
4157                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4158                            }
4159                        );
4160                    }
4161                );
4162
4163                // Load one more event with a backpagination.
4164                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4165
4166                // `ev_id_0` must now be loaded in memory.
4167                assert!(event_loaded(room_event_cache, ev_id_0).await);
4168
4169                // The pagination can be observed via the updates too.
4170                assert_matches!(
4171                    updates_stream.recv().await.unwrap(),
4172                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4173                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4174                        assert_matches!(
4175                            &diffs[0],
4176                            VectorDiff::Insert { index: 0, value: event } => {
4177                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4178                            }
4179                        );
4180                    }
4181                );
4182            }
4183
4184            // Fourth, because `room_event_cache_p0` has locked the store again, the lock
4185            // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
4186            // chunk!
4187            {
4188                let room_event_cache = &room_event_cache_p1;
4189                let updates_stream = &mut updates_stream_p1;
4190
4191                // `ev_id_1` must be loaded in memory, just like before.
4192                assert!(event_loaded(room_event_cache, ev_id_1).await);
4193
4194                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
4195                // state has shrunk to its last chunk.
4196                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4197
4198                // The reload can be observed via the updates too.
4199                assert_matches!(
4200                    updates_stream.recv().await.unwrap(),
4201                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4202                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4203                        assert_matches!(&diffs[0], VectorDiff::Clear);
4204                        assert_matches!(
4205                            &diffs[1],
4206                            VectorDiff::Append { values: events } => {
4207                                assert_eq!(events.len(), 1);
4208                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4209                            }
4210                        );
4211                    }
4212                );
4213
4214                // Load one more event with a backpagination.
4215                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4216
4217                // `ev_id_0` must now be loaded in memory.
4218                assert!(event_loaded(room_event_cache, ev_id_0).await);
4219
4220                // The pagination can be observed via the updates too.
4221                assert_matches!(
4222                    updates_stream.recv().await.unwrap(),
4223                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4224                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4225                        assert_matches!(
4226                            &diffs[0],
4227                            VectorDiff::Insert { index: 0, value: event } => {
4228                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4229                            }
4230                        );
4231                    }
4232                );
4233            }
4234        }
4235
4236        // Repeat that with an explicit read lock (so that we don't rely on
4237        // `event_loaded` to trigger the dirty detection).
4238        for _ in 0..3 {
4239            {
4240                let room_event_cache = &room_event_cache_p0;
4241                let updates_stream = &mut updates_stream_p0;
4242
4243                let guard = room_event_cache.inner.state.read().await.unwrap();
4244
4245                // Guard is kept alive, to ensure we can have multiple read guards alive with a
4246                // shared access.
4247                // See `RoomEventCacheStateLock::read` to learn more.
4248
4249                // The lock is no longer marked as dirty, it's been cleaned.
4250                assert!(guard.is_dirty().not());
4251
4252                // The reload can be observed via the updates too.
4253                assert_matches!(
4254                    updates_stream.recv().await.unwrap(),
4255                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4256                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4257                        assert_matches!(&diffs[0], VectorDiff::Clear);
4258                        assert_matches!(
4259                            &diffs[1],
4260                            VectorDiff::Append { values: events } => {
4261                                assert_eq!(events.len(), 1);
4262                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4263                            }
4264                        );
4265                    }
4266                );
4267
4268                assert!(event_loaded(room_event_cache, ev_id_1).await);
4269                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4270
4271                // Ensure `guard` is alive up to this point (in case this test is refactored, I
4272                // want to make this super explicit).
4273                //
4274                // We drop need to drop it before the pagination because the pagination needs to
4275                // obtain a write lock.
4276                drop(guard);
4277
4278                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4279                assert!(event_loaded(room_event_cache, ev_id_0).await);
4280
4281                // The pagination can be observed via the updates too.
4282                assert_matches!(
4283                    updates_stream.recv().await.unwrap(),
4284                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4285                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4286                        assert_matches!(
4287                            &diffs[0],
4288                            VectorDiff::Insert { index: 0, value: event } => {
4289                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4290                            }
4291                        );
4292                    }
4293                );
4294            }
4295
4296            {
4297                let room_event_cache = &room_event_cache_p1;
4298                let updates_stream = &mut updates_stream_p1;
4299
4300                let guard = room_event_cache.inner.state.read().await.unwrap();
4301
4302                // Guard is kept alive, to ensure we can have multiple read guards alive with a
4303                // shared access.
4304
4305                // The lock is no longer marked as dirty, it's been cleaned.
4306                assert!(guard.is_dirty().not());
4307
4308                // The reload can be observed via the updates too.
4309                assert_matches!(
4310                    updates_stream.recv().await.unwrap(),
4311                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4312                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4313                        assert_matches!(&diffs[0], VectorDiff::Clear);
4314                        assert_matches!(
4315                            &diffs[1],
4316                            VectorDiff::Append { values: events } => {
4317                                assert_eq!(events.len(), 1);
4318                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4319                            }
4320                        );
4321                    }
4322                );
4323
4324                assert!(event_loaded(room_event_cache, ev_id_1).await);
4325                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4326
4327                // Ensure `guard` is alive up to this point (in case this test is refactored, I
4328                // want to make this super explicit).
4329                //
4330                // We drop need to drop it before the pagination because the pagination needs to
4331                // obtain a write lock.
4332                drop(guard);
4333
4334                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4335                assert!(event_loaded(room_event_cache, ev_id_0).await);
4336
4337                // The pagination can be observed via the updates too.
4338                assert_matches!(
4339                    updates_stream.recv().await.unwrap(),
4340                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4341                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4342                        assert_matches!(
4343                            &diffs[0],
4344                            VectorDiff::Insert { index: 0, value: event } => {
4345                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4346                            }
4347                        );
4348                    }
4349                );
4350            }
4351        }
4352
4353        // Repeat that with an explicit write lock.
4354        for _ in 0..3 {
4355            {
4356                let room_event_cache = &room_event_cache_p0;
4357                let updates_stream = &mut updates_stream_p0;
4358
4359                let guard = room_event_cache.inner.state.write().await.unwrap();
4360
4361                // The lock is no longer marked as dirty, it's been cleaned.
4362                assert!(guard.is_dirty().not());
4363
4364                // The reload can be observed via the updates too.
4365                assert_matches!(
4366                    updates_stream.recv().await.unwrap(),
4367                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4368                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4369                        assert_matches!(&diffs[0], VectorDiff::Clear);
4370                        assert_matches!(
4371                            &diffs[1],
4372                            VectorDiff::Append { values: events } => {
4373                                assert_eq!(events.len(), 1);
4374                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4375                            }
4376                        );
4377                    }
4378                );
4379
4380                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
4381                // needs to obtain a read lock.
4382                drop(guard);
4383
4384                assert!(event_loaded(room_event_cache, ev_id_1).await);
4385                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4386
4387                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4388                assert!(event_loaded(room_event_cache, ev_id_0).await);
4389
4390                // The pagination can be observed via the updates too.
4391                assert_matches!(
4392                    updates_stream.recv().await.unwrap(),
4393                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4394                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4395                        assert_matches!(
4396                            &diffs[0],
4397                            VectorDiff::Insert { index: 0, value: event } => {
4398                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4399                            }
4400                        );
4401                    }
4402                );
4403            }
4404
4405            {
4406                let room_event_cache = &room_event_cache_p1;
4407                let updates_stream = &mut updates_stream_p1;
4408
4409                let guard = room_event_cache.inner.state.write().await.unwrap();
4410
4411                // The lock is no longer marked as dirty, it's been cleaned.
4412                assert!(guard.is_dirty().not());
4413
4414                // The reload can be observed via the updates too.
4415                assert_matches!(
4416                    updates_stream.recv().await.unwrap(),
4417                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4418                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4419                        assert_matches!(&diffs[0], VectorDiff::Clear);
4420                        assert_matches!(
4421                            &diffs[1],
4422                            VectorDiff::Append { values: events } => {
4423                                assert_eq!(events.len(), 1);
4424                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4425                            }
4426                        );
4427                    }
4428                );
4429
4430                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
4431                // needs to obtain a read lock.
4432                drop(guard);
4433
4434                assert!(event_loaded(room_event_cache, ev_id_1).await);
4435                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4436
4437                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4438                assert!(event_loaded(room_event_cache, ev_id_0).await);
4439
4440                // The pagination can be observed via the updates too.
4441                assert_matches!(
4442                    updates_stream.recv().await.unwrap(),
4443                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4444                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4445                        assert_matches!(
4446                            &diffs[0],
4447                            VectorDiff::Insert { index: 0, value: event } => {
4448                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4449                            }
4450                        );
4451                    }
4452                );
4453            }
4454        }
4455    }
4456
4457    #[async_test]
4458    async fn test_load_when_dirty() {
4459        let room_id_0 = room_id!("!raclette:patate.ch");
4460        let room_id_1 = room_id!("!morbiflette:patate.ch");
4461
4462        // The storage shared by the two clients.
4463        let event_cache_store = MemoryStore::new();
4464
4465        // Client for the process 0.
4466        let client_p0 = MockClientBuilder::new(None)
4467            .on_builder(|builder| {
4468                builder.store_config(
4469                    StoreConfig::new("process #0".to_owned())
4470                        .event_cache_store(event_cache_store.clone()),
4471                )
4472            })
4473            .build()
4474            .await;
4475
4476        // Client for the process 1.
4477        let client_p1 = MockClientBuilder::new(None)
4478            .on_builder(|builder| {
4479                builder.store_config(
4480                    StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4481                )
4482            })
4483            .build()
4484            .await;
4485
4486        // Subscribe the event caches, and create the room.
4487        let (room_event_cache_0_p0, room_event_cache_0_p1) = {
4488            let event_cache_p0 = client_p0.event_cache();
4489            event_cache_p0.subscribe().unwrap();
4490
4491            let event_cache_p1 = client_p1.event_cache();
4492            event_cache_p1.subscribe().unwrap();
4493
4494            client_p0
4495                .base_client()
4496                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4497            client_p0
4498                .base_client()
4499                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4500
4501            client_p1
4502                .base_client()
4503                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4504            client_p1
4505                .base_client()
4506                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4507
4508            let (room_event_cache_0_p0, _drop_handles) =
4509                client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4510            let (room_event_cache_0_p1, _drop_handles) =
4511                client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4512
4513            (room_event_cache_0_p0, room_event_cache_0_p1)
4514        };
4515
4516        // Let's make the cross-process lock over the store dirty.
4517        {
4518            drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
4519            drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
4520        }
4521
4522        // Create the `RoomEventCache` for `room_id_1`. During its creation, the
4523        // cross-process lock over the store MUST be dirty, which makes no difference as
4524        // a clean one: the state is just loaded, not reloaded.
4525        let (room_event_cache_1_p0, _) =
4526            client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
4527
4528        // Check the lock isn't dirty because it's been cleared.
4529        {
4530            let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
4531            assert!(guard.is_dirty().not());
4532        }
4533
4534        // The only way to test this behaviour is to see that the dirty block in
4535        // `RoomEventCacheStateLock` is covered by this test.
4536    }
4537
4538    async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
4539        room_event_cache
4540            .rfind_map_event_in_memory_by(|event, _previous_event_id| {
4541                (event.event_id().as_deref() == Some(event_id)).then_some(())
4542            })
4543            .await
4544            .unwrap()
4545            .is_some()
4546    }
4547}