matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        Arc,
23        atomic::{AtomicUsize, Ordering},
24    },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::AmbiguityChange,
32    event_cache::Event,
33    linked_chunk::Position,
34    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37    EventId, OwnedEventId, OwnedRoomId,
38    api::Direction,
39    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40    serde::Raw,
41};
42use tokio::sync::{
43    Notify, RwLock,
44    broadcast::{Receiver, Sender},
45    mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50    AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51    RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54    client::WeakClient,
55    event_cache::EventCacheError,
56    room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64/// A subset of an event cache, for a room.
65///
66/// Cloning is shallow, and thus is cheap to do.
67#[derive(Clone)]
68pub struct RoomEventCache {
69    pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_struct("RoomEventCache").finish_non_exhaustive()
75    }
76}
77
78/// Thin wrapper for a room event cache subscriber, so as to trigger
79/// side-effects when all subscribers are gone.
80///
81/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
82/// more subscribers are active. This is an optimisation to reduce the number of
83/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
84/// active, all data are reduced to the minimum.
85///
86/// The side-effect takes effect on `Drop`.
87#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89    /// Underlying receiver of the room event cache's updates.
90    recv: Receiver<RoomEventCacheUpdate>,
91
92    /// To which room are we listening?
93    room_id: OwnedRoomId,
94
95    /// Sender to the auto-shrink channel.
96    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98    /// Shared instance of the auto-shrinker.
99    subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103    fn drop(&mut self) {
104        let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106        trace!(
107            "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108        );
109
110        if previous_subscriber_count == 1 {
111            // We were the last instance of the subscriber; let the auto-shrinker know by
112            // notifying it of our room id.
113
114            let mut room_id = self.room_id.clone();
115
116            // Try to send without waiting for channel capacity, and restart in a spin-loop
117            // if it failed (until a maximum number of attempts is reached, or
118            // the send was successful). The channel shouldn't be super busy in
119            // general, so this should resolve quickly enough.
120
121            let mut num_attempts = 0;
122
123            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124                num_attempts += 1;
125
126                if num_attempts > 1024 {
127                    // If we've tried too many times, just give up with a warning; after all, this
128                    // is only an optimization.
129                    warn!(
130                        "couldn't send notification to the auto-shrink channel \
131                         after 1024 attempts; giving up"
132                    );
133                    return;
134                }
135
136                match err {
137                    mpsc::error::TrySendError::Full(stolen_room_id) => {
138                        room_id = stolen_room_id;
139                    }
140                    mpsc::error::TrySendError::Closed(_) => return,
141                }
142            }
143
144            trace!("sent notification to the parent channel that we were the last subscriber");
145        }
146    }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150    type Target = Receiver<RoomEventCacheUpdate>;
151
152    fn deref(&self) -> &Self::Target {
153        &self.recv
154    }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158    fn deref_mut(&mut self) -> &mut Self::Target {
159        &mut self.recv
160    }
161}
162
163impl RoomEventCache {
164    /// Create a new [`RoomEventCache`] using the given room and store.
165    pub(super) fn new(
166        client: WeakClient,
167        state: RoomEventCacheState,
168        pagination_status: SharedObservable<RoomPaginationStatus>,
169        room_id: OwnedRoomId,
170        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
172    ) -> Self {
173        Self {
174            inner: Arc::new(RoomEventCacheInner::new(
175                client,
176                state,
177                pagination_status,
178                room_id,
179                auto_shrink_sender,
180                generic_update_sender,
181            )),
182        }
183    }
184
185    /// Read all current events.
186    ///
187    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
188    /// subscriber.
189    pub async fn events(&self) -> Vec<Event> {
190        let state = self.inner.state.read().await;
191
192        state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect()
193    }
194
195    /// Subscribe to this room updates, after getting the initial list of
196    /// events.
197    ///
198    /// Use [`RoomEventCache::events`] to get all current events without the
199    /// subscriber. Creating, and especially dropping, a
200    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
201    pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheSubscriber) {
202        let state = self.inner.state.read().await;
203        let events =
204            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
205
206        let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst);
207        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
208
209        let recv = self.inner.sender.subscribe();
210        let subscriber = RoomEventCacheSubscriber {
211            recv,
212            room_id: self.inner.room_id.clone(),
213            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
214            subscriber_count: state.subscriber_count.clone(),
215        };
216
217        (events, subscriber)
218    }
219
220    /// Subscribe to thread for a given root event, and get a (maybe empty)
221    /// initially known list of events for that thread.
222    pub async fn subscribe_to_thread(
223        &self,
224        thread_root: OwnedEventId,
225    ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
226        let mut state = self.inner.state.write().await;
227        state.subscribe_to_thread(thread_root)
228    }
229
230    /// Paginate backwards in a thread, given its root event ID.
231    ///
232    /// Returns whether we've hit the start of the thread, in which case the
233    /// root event will be prepended to the thread.
234    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
235    pub async fn paginate_thread_backwards(
236        &self,
237        thread_root: OwnedEventId,
238        num_events: u16,
239    ) -> Result<bool> {
240        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
241
242        // Take the lock only for a short time here.
243        let mut outcome =
244            self.inner.state.write().await.load_more_thread_events_backwards(thread_root.clone());
245
246        loop {
247            match outcome {
248                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
249                    // Start a threaded pagination from this gap.
250                    let options = RelationsOptions {
251                        from: prev_token.clone(),
252                        dir: Direction::Backward,
253                        limit: Some(num_events.into()),
254                        include_relations: IncludeRelations::AllRelations,
255                        recurse: true,
256                    };
257
258                    let mut result = room
259                        .relations(thread_root.clone(), options)
260                        .await
261                        .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
262
263                    let reached_start = result.next_batch_token.is_none();
264                    trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
265
266                    // Because the state lock is taken again in `load_or_fetch_event`, we need
267                    // to do this *before* we take the state lock again.
268                    let root_event =
269                        if reached_start {
270                            // Prepend the thread root event to the results.
271                            Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
272                                |err| EventCacheError::BackpaginationError(Box::new(err)),
273                            )?)
274                        } else {
275                            None
276                        };
277
278                    let mut state = self.inner.state.write().await;
279
280                    // Save all the events (but the thread root) in the store.
281                    state.save_events(result.chunk.iter().cloned()).await?;
282
283                    // Note: the events are still in the reversed order at this point, so
284                    // pushing will eventually make it so that the root event is the first.
285                    result.chunk.extend(root_event);
286
287                    if let Some(outcome) = state.finish_thread_network_pagination(
288                        thread_root.clone(),
289                        prev_token,
290                        result.next_batch_token,
291                        result.chunk,
292                    ) {
293                        return Ok(outcome.reached_start);
294                    }
295
296                    // fallthrough: restart the pagination.
297                    outcome = state.load_more_thread_events_backwards(thread_root.clone());
298                }
299
300                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
301                    // We're done!
302                    return Ok(true);
303                }
304
305                LoadMoreEventsBackwardsOutcome::Events { .. } => {
306                    // TODO: implement :)
307                    unimplemented!("loading from disk for threads is not implemented yet");
308                }
309            }
310        }
311    }
312
313    /// Return a [`RoomPagination`] API object useful for running
314    /// back-pagination queries in the current room.
315    pub fn pagination(&self) -> RoomPagination {
316        RoomPagination { inner: self.inner.clone() }
317    }
318
319    /// Try to find a single event in this room, starting from the most recent
320    /// event.
321    ///
322    /// **Warning**! It looks into the loaded events from the in-memory linked
323    /// chunk **only**. It doesn't look inside the storage.
324    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Option<O>
325    where
326        P: FnMut(&Event) -> Option<O>,
327    {
328        self.inner.state.read().await.rfind_map_event_in_memory_by(predicate)
329    }
330
331    /// Try to find an event by ID in this room.
332    ///
333    /// It starts by looking into loaded events before looking inside the
334    /// storage.
335    pub async fn find_event(&self, event_id: &EventId) -> Option<Event> {
336        self.inner
337            .state
338            .read()
339            .await
340            .find_event(event_id)
341            .await
342            .ok()
343            .flatten()
344            .map(|(_loc, event)| event)
345    }
346
347    /// Try to find an event by ID in this room, along with its related events.
348    ///
349    /// You can filter which types of related events to retrieve using
350    /// `filter`. `None` will retrieve related events of any type.
351    ///
352    /// The related events are sorted like this:
353    ///
354    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
355    ///   located at the beginning of the array.
356    /// - events present in the linked chunk (be it in memory or in the storage)
357    ///   will be sorted according to their ordering in the linked chunk.
358    pub async fn find_event_with_relations(
359        &self,
360        event_id: &EventId,
361        filter: Option<Vec<RelationType>>,
362    ) -> Option<(Event, Vec<Event>)> {
363        // Search in all loaded or stored events.
364        self.inner
365            .state
366            .read()
367            .await
368            .find_event_with_relations(event_id, filter.clone())
369            .await
370            .ok()
371            .flatten()
372    }
373
374    /// Clear all the storage for this [`RoomEventCache`].
375    ///
376    /// This will get rid of all the events from the linked chunk and persisted
377    /// storage.
378    pub async fn clear(&self) -> Result<()> {
379        // Clear the linked chunk and persisted storage.
380        let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
381
382        // Notify observers about the update.
383        let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
384            diffs: updates_as_vector_diffs,
385            origin: EventsOrigin::Cache,
386        });
387
388        // Notify observers about the generic update.
389        let _ = self
390            .inner
391            .generic_update_sender
392            .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
393
394        Ok(())
395    }
396
397    /// Save some events in the event cache, for further retrieval with
398    /// [`Self::event`].
399    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
400        if let Err(err) = self.inner.state.write().await.save_events(events).await {
401            warn!("couldn't save event in the event cache: {err}");
402        }
403    }
404
405    /// Return a nice debug string (a vector of lines) for the linked chunk of
406    /// events for this room.
407    pub async fn debug_string(&self) -> Vec<String> {
408        self.inner.state.read().await.room_linked_chunk().debug_string()
409    }
410}
411
412/// The (non-cloneable) details of the `RoomEventCache`.
413pub(super) struct RoomEventCacheInner {
414    /// The room id for this room.
415    pub(super) room_id: OwnedRoomId,
416
417    pub weak_room: WeakRoom,
418
419    /// Sender part for subscribers to this room.
420    pub sender: Sender<RoomEventCacheUpdate>,
421
422    /// State for this room's event cache.
423    pub state: RwLock<RoomEventCacheState>,
424
425    /// A notifier that we received a new pagination token.
426    pub pagination_batch_token_notifier: Notify,
427
428    pub pagination_status: SharedObservable<RoomPaginationStatus>,
429
430    /// Sender to the auto-shrink channel.
431    ///
432    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
433    /// more details.
434    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
435
436    /// A clone of [`EventCacheInner::generic_update_sender`].
437    ///
438    /// Whilst `EventCacheInner` handles the generic updates from the sync, or
439    /// the storage, it doesn't handle the update from pagination. Having a
440    /// clone here allows to access it from [`RoomPagination`].
441    pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
442}
443
444impl RoomEventCacheInner {
445    /// Creates a new cache for a room, and subscribes to room updates, so as
446    /// to handle new timeline events.
447    fn new(
448        client: WeakClient,
449        state: RoomEventCacheState,
450        pagination_status: SharedObservable<RoomPaginationStatus>,
451        room_id: OwnedRoomId,
452        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
453        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
454    ) -> Self {
455        let sender = Sender::new(32);
456        let weak_room = WeakRoom::new(client, room_id);
457        Self {
458            room_id: weak_room.room_id().to_owned(),
459            weak_room,
460            state: RwLock::new(state),
461            sender,
462            pagination_batch_token_notifier: Default::default(),
463            auto_shrink_sender,
464            pagination_status,
465            generic_update_sender,
466        }
467    }
468
469    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
470        if account_data.is_empty() {
471            return;
472        }
473
474        let mut handled_read_marker = false;
475
476        trace!("Handling account data");
477
478        for raw_event in account_data {
479            match raw_event.deserialize() {
480                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
481                    // If duplicated, do not forward read marker multiple times
482                    // to avoid clutter the update channel.
483                    if handled_read_marker {
484                        continue;
485                    }
486
487                    handled_read_marker = true;
488
489                    // Propagate to observers. (We ignore the error if there aren't any.)
490                    let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
491                        event_id: ev.content.event_id,
492                    });
493                }
494
495                Ok(_) => {
496                    // We're not interested in other room account data updates,
497                    // at this point.
498                }
499
500                Err(e) => {
501                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
502                    warn!(event_type, "Failed to deserialize account data: {e}");
503                }
504            }
505        }
506    }
507
508    #[instrument(skip_all, fields(room_id = %self.room_id))]
509    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
510        self.handle_timeline(
511            updates.timeline,
512            updates.ephemeral.clone(),
513            updates.ambiguity_changes,
514        )
515        .await?;
516        self.handle_account_data(updates.account_data);
517
518        Ok(())
519    }
520
521    #[instrument(skip_all, fields(room_id = %self.room_id))]
522    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
523        self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
524
525        Ok(())
526    }
527
528    /// Handle a [`Timeline`], i.e. new events received by a sync for this
529    /// room.
530    async fn handle_timeline(
531        &self,
532        timeline: Timeline,
533        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
534        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
535    ) -> Result<()> {
536        if timeline.events.is_empty()
537            && timeline.prev_batch.is_none()
538            && ephemeral_events.is_empty()
539            && ambiguity_changes.is_empty()
540        {
541            return Ok(());
542        }
543
544        // Add all the events to the backend.
545        trace!("adding new events");
546
547        let (stored_prev_batch_token, timeline_event_diffs) =
548            self.state.write().await.handle_sync(timeline).await?;
549
550        // Now that all events have been added, we can trigger the
551        // `pagination_token_notifier`.
552        if stored_prev_batch_token {
553            self.pagination_batch_token_notifier.notify_one();
554        }
555
556        // The order matters here: first send the timeline event diffs, then only the
557        // related events (read receipts, etc.).
558        if !timeline_event_diffs.is_empty() {
559            let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
560                diffs: timeline_event_diffs,
561                origin: EventsOrigin::Sync,
562            });
563
564            let _ = self
565                .generic_update_sender
566                .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
567        }
568
569        if !ephemeral_events.is_empty() {
570            let _ = self
571                .sender
572                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
573        }
574
575        if !ambiguity_changes.is_empty() {
576            let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
577        }
578
579        Ok(())
580    }
581}
582
583/// Internal type to represent the output of
584/// [`RoomEventCacheState::load_more_events_backwards`].
585#[derive(Debug)]
586pub(super) enum LoadMoreEventsBackwardsOutcome {
587    /// A gap has been inserted.
588    Gap {
589        /// The previous batch token to be used as the "end" parameter in the
590        /// back-pagination request.
591        prev_token: Option<String>,
592    },
593
594    /// The start of the timeline has been reached.
595    StartOfTimeline,
596
597    /// Events have been inserted.
598    Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
599}
600
601// Use a private module to hide `events` to this parent module.
602mod private {
603    use std::{
604        collections::{BTreeMap, HashMap, HashSet},
605        sync::{Arc, atomic::AtomicUsize},
606    };
607
608    use eyeball::SharedObservable;
609    use eyeball_im::VectorDiff;
610    use matrix_sdk_base::{
611        apply_redaction,
612        deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
613        event_cache::{
614            Event, Gap,
615            store::{DynEventCacheStore, EventCacheStoreLock},
616        },
617        linked_chunk::{
618            ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
619            OwnedLinkedChunkId, Position, Update,
620            lazy_loader::{self},
621        },
622        serde_helpers::{extract_edit_target, extract_thread_root},
623        sync::Timeline,
624    };
625    use matrix_sdk_common::executor::spawn;
626    use ruma::{
627        EventId, OwnedEventId, OwnedRoomId,
628        events::{
629            AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
630            relation::RelationType, room::redaction::SyncRoomRedactionEvent,
631        },
632        room_version_rules::RoomVersionRules,
633        serde::Raw,
634    };
635    use tokio::sync::broadcast::{Receiver, Sender};
636    use tracing::{debug, error, instrument, trace, warn};
637
638    use super::{
639        super::{EventCacheError, deduplicator::DeduplicationOutcome},
640        EventLocation, LoadMoreEventsBackwardsOutcome,
641        events::EventLinkedChunk,
642        sort_positions_descending,
643    };
644    use crate::event_cache::{
645        BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
646        ThreadEventCacheUpdate, deduplicator::filter_duplicate_events,
647        room::threads::ThreadEventCache,
648    };
649
650    /// State for a single room's event cache.
651    ///
652    /// This contains all the inner mutable states that ought to be updated at
653    /// the same time.
654    pub struct RoomEventCacheState {
655        /// The room this state relates to.
656        room: OwnedRoomId,
657
658        /// The rules for the version of this room.
659        room_version_rules: RoomVersionRules,
660
661        /// Whether thread support has been enabled for the event cache.
662        enabled_thread_support: bool,
663
664        /// Reference to the underlying backing store.
665        store: EventCacheStoreLock,
666
667        /// The loaded events for the current room, that is, the in-memory
668        /// linked chunk for this room.
669        room_linked_chunk: EventLinkedChunk,
670
671        /// Threads present in this room.
672        ///
673        /// Keyed by the thread root event ID.
674        threads: HashMap<OwnedEventId, ThreadEventCache>,
675
676        /// Have we ever waited for a previous-batch-token to come from sync, in
677        /// the context of pagination? We do this at most once per room,
678        /// the first time we try to run backward pagination. We reset
679        /// that upon clearing the timeline events.
680        pub waited_for_initial_prev_token: bool,
681
682        pagination_status: SharedObservable<RoomPaginationStatus>,
683
684        /// See doc comment of
685        /// [`super::super::EventCacheInner::linked_chunk_update_sender`].
686        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
687
688        /// An atomic count of the current number of subscriber of the
689        /// [`super::RoomEventCache`].
690        pub(super) subscriber_count: Arc<AtomicUsize>,
691    }
692
693    impl RoomEventCacheState {
694        /// Create a new state, or reload it from storage if it's been enabled.
695        ///
696        /// Not all events are going to be loaded. Only a portion of them. The
697        /// [`EventLinkedChunk`] relies on a [`LinkedChunk`] to store all
698        /// events. Only the last chunk will be loaded. It means the
699        /// events are loaded from the most recent to the oldest. To
700        /// load more events, see [`Self::load_more_events_backwards`].
701        ///
702        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
703        pub async fn new(
704            room_id: OwnedRoomId,
705            room_version_rules: RoomVersionRules,
706            enabled_thread_support: bool,
707            linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
708            store: EventCacheStoreLock,
709            pagination_status: SharedObservable<RoomPaginationStatus>,
710        ) -> Result<Self, EventCacheError> {
711            let store_lock = store.lock().await?;
712
713            let linked_chunk_id = LinkedChunkId::Room(&room_id);
714
715            // Load the full linked chunk's metadata, so as to feed the order tracker.
716            //
717            // If loading the full linked chunk failed, we'll clear the event cache, as it
718            // indicates that at some point, there's some malformed data.
719            let full_linked_chunk_metadata =
720                match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await {
721                    Ok(metas) => metas,
722                    Err(err) => {
723                        error!(
724                            "error when loading a linked chunk's metadata from the store: {err}"
725                        );
726
727                        // Try to clear storage for this room.
728                        store_lock
729                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
730                            .await?;
731
732                        // Restart with an empty linked chunk.
733                        None
734                    }
735                };
736
737            let linked_chunk = match store_lock
738                .load_last_chunk(linked_chunk_id)
739                .await
740                .map_err(EventCacheError::from)
741                .and_then(|(last_chunk, chunk_identifier_generator)| {
742                    lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
743                        .map_err(EventCacheError::from)
744                }) {
745                Ok(linked_chunk) => linked_chunk,
746                Err(err) => {
747                    error!(
748                        "error when loading a linked chunk's latest chunk from the store: {err}"
749                    );
750
751                    // Try to clear storage for this room.
752                    store_lock
753                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
754                        .await?;
755
756                    None
757                }
758            };
759
760            let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk(
761                linked_chunk,
762                full_linked_chunk_metadata,
763            );
764
765            // The threads mapping is intentionally empty at start, since we're going to
766            // reload threads lazily, as soon as we need to (based on external
767            // subscribers) or when we get new information about those (from
768            // sync).
769            let threads = HashMap::new();
770
771            Ok(Self {
772                room: room_id,
773                room_version_rules,
774                enabled_thread_support,
775                store,
776                room_linked_chunk,
777                threads,
778                waited_for_initial_prev_token: false,
779                subscriber_count: Default::default(),
780                pagination_status,
781                linked_chunk_update_sender,
782            })
783        }
784
785        /// Load a linked chunk's full metadata, making sure the chunks are
786        /// according to their their links.
787        ///
788        /// Returns `None` if there's no such linked chunk in the store, or an
789        /// error if the linked chunk is malformed.
790        async fn load_linked_chunk_metadata(
791            store: &DynEventCacheStore,
792            linked_chunk_id: LinkedChunkId<'_>,
793        ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
794            let mut all_chunks = store
795                .load_all_chunks_metadata(linked_chunk_id)
796                .await
797                .map_err(EventCacheError::from)?;
798
799            if all_chunks.is_empty() {
800                // There are no chunks, so there's nothing to do.
801                return Ok(None);
802            }
803
804            // Transform the vector into a hashmap, for quick lookup of the predecessors.
805            let chunk_map: HashMap<_, _> =
806                all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
807
808            // Find a last chunk.
809            let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
810            let Some(last) = iter.next() else {
811                return Err(EventCacheError::InvalidLinkedChunkMetadata {
812                    details: "no last chunk found".to_owned(),
813                });
814            };
815
816            // There must at most one last chunk.
817            if let Some(other_last) = iter.next() {
818                return Err(EventCacheError::InvalidLinkedChunkMetadata {
819                    details: format!(
820                        "chunks {} and {} both claim to be last chunks",
821                        last.identifier.index(),
822                        other_last.identifier.index()
823                    ),
824                });
825            }
826
827            // Rewind the chain back to the first chunk, and do some checks at the same
828            // time.
829            let mut seen = HashSet::new();
830            let mut current = last;
831            loop {
832                // If we've already seen this chunk, there's a cycle somewhere.
833                if !seen.insert(current.identifier) {
834                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
835                        details: format!(
836                            "cycle detected in linked chunk at {}",
837                            current.identifier.index()
838                        ),
839                    });
840                }
841
842                let Some(prev_id) = current.previous else {
843                    // If there's no previous chunk, we're done.
844                    if seen.len() != all_chunks.len() {
845                        return Err(EventCacheError::InvalidLinkedChunkMetadata {
846                            details: format!(
847                                "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
848                                seen.len(),
849                                all_chunks.len()
850                            ),
851                        });
852                    }
853                    break;
854                };
855
856                // If the previous chunk is not in the map, then it's unknown
857                // and missing.
858                let Some(pred_meta) = chunk_map.get(&prev_id) else {
859                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
860                        details: format!(
861                            "missing predecessor {} chunk for {}",
862                            prev_id.index(),
863                            current.identifier.index()
864                        ),
865                    });
866                };
867
868                // If the previous chunk isn't connected to the next, then the link is invalid.
869                if pred_meta.next != Some(current.identifier) {
870                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
871                        details: format!(
872                            "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
873                            pred_meta.identifier.index(),
874                            pred_meta.next.map(|chunk_id| chunk_id.index()),
875                            current.identifier.index()
876                        ),
877                    });
878                }
879
880                current = *pred_meta;
881            }
882
883            // At this point, `current` is the identifier of the first chunk.
884            //
885            // Reorder the resulting vector, by going through the chain of `next` links, and
886            // swapping items into their final position.
887            //
888            // Invariant in this loop: all items in [0..i[ are in their final, correct
889            // position.
890            let mut current = current.identifier;
891            for i in 0..all_chunks.len() {
892                // Find the target metadata.
893                let j = all_chunks
894                    .iter()
895                    .rev()
896                    .position(|meta| meta.identifier == current)
897                    .map(|j| all_chunks.len() - 1 - j)
898                    .expect("the target chunk must be present in the metadata");
899                if i != j {
900                    all_chunks.swap(i, j);
901                }
902                if let Some(next) = all_chunks[i].next {
903                    current = next;
904                }
905            }
906
907            Ok(Some(all_chunks))
908        }
909
910        /// Load more events backwards if the last chunk is **not** a gap.
911        pub(in super::super) async fn load_more_events_backwards(
912            &mut self,
913        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
914            // If any in-memory chunk is a gap, don't load more events, and let the caller
915            // resolve the gap.
916            if let Some(prev_token) = self.room_linked_chunk.rgap().map(|gap| gap.prev_token) {
917                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
918            }
919
920            let store = self.store.lock().await?;
921
922            let prev_first_chunk =
923                self.room_linked_chunk.chunks().next().expect("a linked chunk is never empty");
924
925            // The first chunk is not a gap, we can load its previous chunk.
926            let linked_chunk_id = LinkedChunkId::Room(&self.room);
927            let new_first_chunk = match store
928                .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
929                .await
930            {
931                Ok(Some(new_first_chunk)) => {
932                    // All good, let's continue with this chunk.
933                    new_first_chunk
934                }
935
936                Ok(None) => {
937                    // If we never received events for this room, this means we've never received a
938                    // sync for that room, because every room must have *at least* a room creation
939                    // event. Otherwise, we have reached the start of the timeline.
940
941                    if self.room_linked_chunk.events().next().is_some() {
942                        // If there's at least one event, this means we've reached the start of the
943                        // timeline, since the chunk is fully loaded.
944                        trace!("chunk is fully loaded and non-empty: reached_start=true");
945                        return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
946                    }
947
948                    // Otherwise, start back-pagination from the end of the room.
949                    return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
950                }
951
952                Err(err) => {
953                    error!("error when loading the previous chunk of a linked chunk: {err}");
954
955                    // Clear storage for this room.
956                    store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
957
958                    // Return the error.
959                    return Err(err.into());
960                }
961            };
962
963            let chunk_content = new_first_chunk.content.clone();
964
965            // We've reached the start on disk, if and only if, there was no chunk prior to
966            // the one we just loaded.
967            //
968            // This value is correct, if and only if, it is used for a chunk content of kind
969            // `Items`.
970            let reached_start = new_first_chunk.previous.is_none();
971
972            if let Err(err) = self.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk) {
973                error!("error when inserting the previous chunk into its linked chunk: {err}");
974
975                // Clear storage for this room.
976                store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
977
978                // Return the error.
979                return Err(err.into());
980            }
981
982            // ⚠️ Let's not propagate the updates to the store! We already have these data
983            // in the store! Let's drain them.
984            let _ = self.room_linked_chunk.store_updates().take();
985
986            // However, we want to get updates as `VectorDiff`s.
987            let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
988
989            Ok(match chunk_content {
990                ChunkContent::Gap(gap) => {
991                    trace!("reloaded chunk from disk (gap)");
992                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
993                }
994
995                ChunkContent::Items(events) => {
996                    trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
997                    LoadMoreEventsBackwardsOutcome::Events {
998                        events,
999                        timeline_event_diffs,
1000                        reached_start,
1001                    }
1002                }
1003            })
1004        }
1005
1006        /// If storage is enabled, unload all the chunks, then reloads only the
1007        /// last one.
1008        ///
1009        /// If storage's enabled, return a diff update that starts with a clear
1010        /// of all events; as a result, the caller may override any
1011        /// pending diff updates with the result of this function.
1012        ///
1013        /// Otherwise, returns `None`.
1014        pub(super) async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1015            let store_lock = self.store.lock().await?;
1016
1017            // Attempt to load the last chunk.
1018            let linked_chunk_id = LinkedChunkId::Room(&self.room);
1019            let (last_chunk, chunk_identifier_generator) =
1020                match store_lock.load_last_chunk(linked_chunk_id).await {
1021                    Ok(pair) => pair,
1022
1023                    Err(err) => {
1024                        // If loading the last chunk failed, clear the entire linked chunk.
1025                        error!("error when reloading a linked chunk from memory: {err}");
1026
1027                        // Clear storage for this room.
1028                        store_lock
1029                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1030                            .await?;
1031
1032                        // Restart with an empty linked chunk.
1033                        (None, ChunkIdentifierGenerator::new_from_scratch())
1034                    }
1035                };
1036
1037            debug!("unloading the linked chunk, and resetting it to its last chunk");
1038
1039            // Remove all the chunks from the linked chunks, except for the last one, and
1040            // updates the chunk identifier generator.
1041            if let Err(err) =
1042                self.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1043            {
1044                error!("error when replacing the linked chunk: {err}");
1045                return self.reset_internal().await;
1046            }
1047
1048            // Let pagination observers know that we may have not reached the start of the
1049            // timeline.
1050            // TODO: likely need to cancel any ongoing pagination.
1051            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1052
1053            // Don't propagate those updates to the store; this is only for the in-memory
1054            // representation that we're doing this. Let's drain those store updates.
1055            let _ = self.room_linked_chunk.store_updates().take();
1056
1057            Ok(())
1058        }
1059
1060        /// Automatically shrink the room if there are no more subscribers, as
1061        /// indicated by the atomic number of active subscribers.
1062        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1063        pub(crate) async fn auto_shrink_if_no_subscribers(
1064            &mut self,
1065        ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1066            let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst);
1067
1068            trace!(subscriber_count, "received request to auto-shrink");
1069
1070            if subscriber_count == 0 {
1071                // If we are the last strong reference to the auto-shrinker, we can shrink the
1072                // events data structure to its last chunk.
1073                self.shrink_to_last_chunk().await?;
1074                Ok(Some(self.room_linked_chunk.updates_as_vector_diffs()))
1075            } else {
1076                Ok(None)
1077            }
1078        }
1079
1080        #[cfg(test)]
1081        pub(crate) async fn force_shrink_to_last_chunk(
1082            &mut self,
1083        ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1084            self.shrink_to_last_chunk().await?;
1085            Ok(self.room_linked_chunk.updates_as_vector_diffs())
1086        }
1087
1088        pub(crate) fn room_event_order(&self, event_pos: Position) -> Option<usize> {
1089            self.room_linked_chunk.event_order(event_pos)
1090        }
1091
1092        /// Removes the bundled relations from an event, if they were present.
1093        ///
1094        /// Only replaces the present if it contained bundled relations.
1095        fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1096            // We're going to get rid of the `unsigned`/`m.relations` field, if it's
1097            // present.
1098            // Use a closure that returns an option so we can quickly short-circuit.
1099            let mut closure = || -> Option<()> {
1100                let mut val: serde_json::Value = event.deserialize_as().ok()?;
1101                let unsigned = val.get_mut("unsigned")?;
1102                let unsigned_obj = unsigned.as_object_mut()?;
1103                if unsigned_obj.remove("m.relations").is_some() {
1104                    *event = Raw::new(&val).ok()?.cast_unchecked();
1105                }
1106                None
1107            };
1108            let _ = closure();
1109        }
1110
1111        fn strip_relations_from_event(ev: &mut Event) {
1112            match &mut ev.kind {
1113                TimelineEventKind::Decrypted(decrypted) => {
1114                    // Remove all information about encryption info for
1115                    // the bundled events.
1116                    decrypted.unsigned_encryption_info = None;
1117
1118                    // Remove the `unsigned`/`m.relations` field, if needs be.
1119                    Self::strip_relations_if_present(&mut decrypted.event);
1120                }
1121
1122                TimelineEventKind::UnableToDecrypt { event, .. }
1123                | TimelineEventKind::PlainText { event } => {
1124                    Self::strip_relations_if_present(event);
1125                }
1126            }
1127        }
1128
1129        /// Strips the bundled relations from a collection of events.
1130        fn strip_relations_from_events(items: &mut [Event]) {
1131            for ev in items.iter_mut() {
1132                Self::strip_relations_from_event(ev);
1133            }
1134        }
1135
1136        /// Remove events by their position, in `EventLinkedChunk` and in
1137        /// `EventCacheStore`.
1138        ///
1139        /// This method is purposely isolated because it must ensure that
1140        /// positions are sorted appropriately or it can be disastrous.
1141        #[instrument(skip_all)]
1142        async fn remove_events(
1143            &mut self,
1144            in_memory_events: Vec<(OwnedEventId, Position)>,
1145            in_store_events: Vec<(OwnedEventId, Position)>,
1146        ) -> Result<(), EventCacheError> {
1147            // In-store events.
1148            if !in_store_events.is_empty() {
1149                let mut positions = in_store_events
1150                    .into_iter()
1151                    .map(|(_event_id, position)| position)
1152                    .collect::<Vec<_>>();
1153
1154                sort_positions_descending(&mut positions);
1155
1156                let updates = positions
1157                    .into_iter()
1158                    .map(|pos| Update::RemoveItem { at: pos })
1159                    .collect::<Vec<_>>();
1160
1161                self.apply_store_only_updates(updates).await?;
1162            }
1163
1164            // In-memory events.
1165            if in_memory_events.is_empty() {
1166                // Nothing else to do, return early.
1167                return Ok(());
1168            }
1169
1170            // `remove_events_by_position` is responsible of sorting positions.
1171            self.room_linked_chunk
1172                .remove_events_by_position(
1173                    in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1174                )
1175                .expect("failed to remove an event");
1176
1177            self.propagate_changes().await
1178        }
1179
1180        /// Propagate changes to the underlying storage.
1181        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1182            let updates = self.room_linked_chunk.store_updates().take();
1183            self.send_updates_to_store(updates).await
1184        }
1185
1186        /// Apply some updates that are effective only on the store itself.
1187        ///
1188        /// This method should be used only for updates that happen *outside*
1189        /// the in-memory linked chunk. Such updates must be applied
1190        /// onto the ordering tracker as well as to the persistent
1191        /// storage.
1192        async fn apply_store_only_updates(
1193            &mut self,
1194            updates: Vec<Update<Event, Gap>>,
1195        ) -> Result<(), EventCacheError> {
1196            self.room_linked_chunk.order_tracker.map_updates(&updates);
1197            self.send_updates_to_store(updates).await
1198        }
1199
1200        async fn send_updates_to_store(
1201            &mut self,
1202            mut updates: Vec<Update<Event, Gap>>,
1203        ) -> Result<(), EventCacheError> {
1204            if updates.is_empty() {
1205                return Ok(());
1206            }
1207
1208            // Strip relations from updates which insert or replace items.
1209            for update in updates.iter_mut() {
1210                match update {
1211                    Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1212                    Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1213                    // Other update kinds don't involve adding new events.
1214                    Update::NewItemsChunk { .. }
1215                    | Update::NewGapChunk { .. }
1216                    | Update::RemoveChunk(_)
1217                    | Update::RemoveItem { .. }
1218                    | Update::DetachLastItems { .. }
1219                    | Update::StartReattachItems
1220                    | Update::EndReattachItems
1221                    | Update::Clear => {}
1222                }
1223            }
1224
1225            // Spawn a task to make sure that all the changes are effectively forwarded to
1226            // the store, even if the call to this method gets aborted.
1227            //
1228            // The store cross-process locking involves an actual mutex, which ensures that
1229            // storing updates happens in the expected order.
1230
1231            let store = self.store.clone();
1232            let room_id = self.room.clone();
1233            let cloned_updates = updates.clone();
1234
1235            spawn(async move {
1236                let store = store.lock().await?;
1237
1238                trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1239                let linked_chunk_id = LinkedChunkId::Room(&room_id);
1240                store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1241                trace!("linked chunk updates applied");
1242
1243                super::Result::Ok(())
1244            })
1245            .await
1246            .expect("joining failed")?;
1247
1248            // Forward that the store got updated to observers.
1249            let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1250                linked_chunk_id: OwnedLinkedChunkId::Room(self.room.clone()),
1251                updates,
1252            });
1253
1254            Ok(())
1255        }
1256
1257        /// Reset this data structure as if it were brand new.
1258        ///
1259        /// Return a single diff update that is a clear of all events; as a
1260        /// result, the caller may override any pending diff updates
1261        /// with the result of this function.
1262        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1263        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1264            self.reset_internal().await?;
1265
1266            let diff_updates = self.room_linked_chunk.updates_as_vector_diffs();
1267
1268            // Ensure the contract defined in the doc comment is true:
1269            debug_assert_eq!(diff_updates.len(), 1);
1270            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1271
1272            Ok(diff_updates)
1273        }
1274
1275        async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1276            self.room_linked_chunk.reset();
1277
1278            // No need to update the thread summaries: the room events are
1279            // gone because of the
1280            // reset of `room_linked_chunk`.
1281            //
1282            // Clear the threads.
1283            for thread in self.threads.values_mut() {
1284                thread.clear();
1285            }
1286
1287            self.propagate_changes().await?;
1288
1289            // Reset the pagination state too: pretend we never waited for the initial
1290            // prev-batch token, and indicate that we're not at the start of the
1291            // timeline, since we don't know about that anymore.
1292            self.waited_for_initial_prev_token = false;
1293            // TODO: likely must cancel any ongoing back-paginations too
1294            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1295
1296            Ok(())
1297        }
1298
1299        /// Returns a read-only reference to the underlying room linked chunk.
1300        pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1301            &self.room_linked_chunk
1302        }
1303
1304        //// Find a single event in this room, starting from the most recent event.
1305        ///
1306        /// **Warning**! It looks into the loaded events from the in-memory
1307        /// linked chunk **only**. It doesn't look inside the storage,
1308        /// contrary to [`Self::find_event`].
1309        pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1310        where
1311            P: FnMut(&Event) -> Option<O>,
1312        {
1313            self.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event))
1314        }
1315
1316        /// Find a single event in this room.
1317        ///
1318        /// It starts by looking into loaded events in `EventLinkedChunk` before
1319        /// looking inside the storage.
1320        pub async fn find_event(
1321            &self,
1322            event_id: &EventId,
1323        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1324            // There are supposedly fewer events loaded in memory than in the store. Let's
1325            // start by looking up in the `EventLinkedChunk`.
1326            for (position, event) in self.room_linked_chunk.revents() {
1327                if event.event_id().as_deref() == Some(event_id) {
1328                    return Ok(Some((EventLocation::Memory(position), event.clone())));
1329                }
1330            }
1331
1332            let store = self.store.lock().await?;
1333
1334            Ok(store
1335                .find_event(&self.room, event_id)
1336                .await?
1337                .map(|event| (EventLocation::Store, event)))
1338        }
1339
1340        /// Find an event and all its relations in the persisted storage.
1341        ///
1342        /// This goes straight to the database, as a simplification; we don't
1343        /// expect to need to have to look up in memory events, or that
1344        /// all the related events are actually loaded.
1345        ///
1346        /// The related events are sorted like this:
1347        /// - events saved out-of-band with
1348        ///   [`super::RoomEventCache::save_events`] will be located at the
1349        ///   beginning of the array.
1350        /// - events present in the linked chunk (be it in memory or in the
1351        ///   database) will be sorted according to their ordering in the linked
1352        ///   chunk.
1353        pub async fn find_event_with_relations(
1354            &self,
1355            event_id: &EventId,
1356            filters: Option<Vec<RelationType>>,
1357        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1358            let store = self.store.lock().await?;
1359
1360            // First, hit storage to get the target event and its related events.
1361            let found = store.find_event(&self.room, event_id).await?;
1362
1363            let Some(target) = found else {
1364                // We haven't found the event: return early.
1365                return Ok(None);
1366            };
1367
1368            // Then, initialize the stack with all the related events, to find the
1369            // transitive closure of all the related events.
1370            let mut related =
1371                store.find_event_relations(&self.room, event_id, filters.as_deref()).await?;
1372            let mut stack =
1373                related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
1374
1375            // Also keep track of already seen events, in case there's a loop in the
1376            // relation graph.
1377            let mut already_seen = HashSet::new();
1378            already_seen.insert(event_id.to_owned());
1379
1380            let mut num_iters = 1;
1381
1382            // Find the related event for each previously-related event.
1383            while let Some(event_id) = stack.pop() {
1384                if !already_seen.insert(event_id.clone()) {
1385                    // Skip events we've already seen.
1386                    continue;
1387                }
1388
1389                let other_related =
1390                    store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?;
1391
1392                stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
1393                related.extend(other_related);
1394
1395                num_iters += 1;
1396            }
1397
1398            trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
1399
1400            // Sort the results by their positions in the linked chunk, if available.
1401            //
1402            // If an event doesn't have a known position, it goes to the start of the array.
1403            related.sort_by(|(_, lhs), (_, rhs)| {
1404                use std::cmp::Ordering;
1405                match (lhs, rhs) {
1406                    (None, None) => Ordering::Equal,
1407                    (None, Some(_)) => Ordering::Less,
1408                    (Some(_), None) => Ordering::Greater,
1409                    (Some(lhs), Some(rhs)) => {
1410                        let lhs = self.room_event_order(*lhs);
1411                        let rhs = self.room_event_order(*rhs);
1412
1413                        // The events should have a definite position, but in the case they don't,
1414                        // still consider that not having a position means you'll end at the start
1415                        // of the array.
1416                        match (lhs, rhs) {
1417                            (None, None) => Ordering::Equal,
1418                            (None, Some(_)) => Ordering::Less,
1419                            (Some(_), None) => Ordering::Greater,
1420                            (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
1421                        }
1422                    }
1423                }
1424            });
1425
1426            // Keep only the events, not their positions.
1427            let related = related.into_iter().map(|(event, _pos)| event).collect();
1428
1429            Ok(Some((target, related)))
1430        }
1431
1432        /// Post-process new events, after they have been added to the in-memory
1433        /// linked chunk.
1434        ///
1435        /// Flushes updates to disk first.
1436        async fn post_process_new_events(
1437            &mut self,
1438            events: Vec<Event>,
1439            is_sync: bool,
1440        ) -> Result<(), EventCacheError> {
1441            // Update the store before doing the post-processing.
1442            self.propagate_changes().await?;
1443
1444            let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1445
1446            for event in events {
1447                self.maybe_apply_new_redaction(&event).await?;
1448
1449                if self.enabled_thread_support {
1450                    // Only add the event to a thread if:
1451                    // - thread support is enabled,
1452                    // - and if this is a sync (we can't know where to insert backpaginated events
1453                    //   in threads).
1454                    if is_sync {
1455                        if let Some(thread_root) = extract_thread_root(event.raw()) {
1456                            new_events_by_thread
1457                                .entry(thread_root)
1458                                .or_default()
1459                                .push(event.clone());
1460                        } else if let Some(event_id) = event.event_id() {
1461                            // If we spot the root of a thread, add it to its linked chunk.
1462                            if self.threads.contains_key(&event_id) {
1463                                new_events_by_thread
1464                                    .entry(event_id)
1465                                    .or_default()
1466                                    .push(event.clone());
1467                            }
1468                        }
1469                    }
1470
1471                    // Look for edits that may apply to a thread; we'll process them later.
1472                    if let Some(edit_target) = extract_edit_target(event.raw()) {
1473                        // If the edited event is known, and part of a thread,
1474                        if let Some((_location, edit_target_event)) =
1475                            self.find_event(&edit_target).await?
1476                            && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1477                        {
1478                            // Mark the thread for processing, unless it was already marked as
1479                            // such.
1480                            new_events_by_thread.entry(thread_root).or_default();
1481                        }
1482                    }
1483                }
1484
1485                // Save a bundled thread event, if there was one.
1486                if let Some(bundled_thread) = event.bundled_latest_thread_event {
1487                    self.save_events([*bundled_thread]).await?;
1488                }
1489            }
1490
1491            if self.enabled_thread_support {
1492                self.update_threads(new_events_by_thread).await?;
1493            }
1494
1495            Ok(())
1496        }
1497
1498        fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1499            // TODO: when there's persistent storage, try to lazily reload from disk, if
1500            // missing from memory.
1501            self.threads.entry(root_event_id.clone()).or_insert_with(|| {
1502                ThreadEventCache::new(
1503                    self.room.clone(),
1504                    root_event_id,
1505                    self.linked_chunk_update_sender.clone(),
1506                )
1507            })
1508        }
1509
1510        #[instrument(skip_all)]
1511        async fn update_threads(
1512            &mut self,
1513            new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1514        ) -> Result<(), EventCacheError> {
1515            for (thread_root, new_events) in new_events_by_thread {
1516                let thread_cache = self.get_or_reload_thread(thread_root.clone());
1517
1518                thread_cache.add_live_events(new_events);
1519
1520                let mut latest_event_id = thread_cache.latest_event_id();
1521
1522                // If there's an edit to the latest event in the thread, use the latest edit
1523                // event id as the latest event id for the thread summary.
1524                if let Some(event_id) = latest_event_id.as_ref()
1525                    && let Some((_, edits)) = self
1526                        .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1527                        .await?
1528                    && let Some(latest_edit) = edits.last()
1529                {
1530                    latest_event_id = latest_edit.event_id();
1531                }
1532
1533                self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1534            }
1535
1536            Ok(())
1537        }
1538
1539        /// Update a thread summary on the given thread root, if needs be.
1540        async fn maybe_update_thread_summary(
1541            &mut self,
1542            thread_root: OwnedEventId,
1543            latest_event_id: Option<OwnedEventId>,
1544        ) -> Result<(), EventCacheError> {
1545            // Add a thread summary to the (room) event which has the thread root, if we
1546            // knew about it.
1547
1548            let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1549                trace!(%thread_root, "thread root event is missing from the room linked chunk");
1550                return Ok(());
1551            };
1552
1553            let prev_summary = target_event.thread_summary.summary();
1554
1555            // Recompute the thread summary, if needs be.
1556
1557            // Read the latest number of thread replies from the store.
1558            //
1559            // Implementation note: since this is based on the `m.relates_to` field, and
1560            // that field can only be present on room messages, we don't have to
1561            // worry about filtering out aggregation events (like
1562            // reactions/edits/etc.). Pretty neat, huh?
1563            let num_replies = {
1564                let store_guard = &*self.store.lock().await?;
1565                let thread_replies = store_guard
1566                    .find_event_relations(&self.room, &thread_root, Some(&[RelationType::Thread]))
1567                    .await?;
1568                thread_replies.len().try_into().unwrap_or(u32::MAX)
1569            };
1570
1571            let new_summary = if num_replies > 0 {
1572                Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1573            } else {
1574                None
1575            };
1576
1577            if prev_summary == new_summary.as_ref() {
1578                trace!(%thread_root, "thread summary is already up-to-date");
1579                return Ok(());
1580            }
1581
1582            // Trigger an update to observers.
1583            trace!(%thread_root, "updating thread summary: {new_summary:?}");
1584            target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
1585            self.replace_event_at(location, target_event).await
1586        }
1587
1588        /// Replaces a single event, be it saved in memory or in the store.
1589        ///
1590        /// If it was saved in memory, this will emit a notification to
1591        /// observers that a single item has been replaced. Otherwise,
1592        /// such a notification is not emitted, because observers are
1593        /// unlikely to observe the store updates directly.
1594        async fn replace_event_at(
1595            &mut self,
1596            location: EventLocation,
1597            event: Event,
1598        ) -> Result<(), EventCacheError> {
1599            match location {
1600                EventLocation::Memory(position) => {
1601                    self.room_linked_chunk
1602                        .replace_event_at(position, event)
1603                        .expect("should have been a valid position of an item");
1604                    // We just changed the in-memory representation; synchronize this with
1605                    // the store.
1606                    self.propagate_changes().await?;
1607                }
1608                EventLocation::Store => {
1609                    self.save_events([event]).await?;
1610                }
1611            }
1612
1613            Ok(())
1614        }
1615
1616        /// If the given event is a redaction, try to retrieve the
1617        /// to-be-redacted event in the chunk, and replace it by the
1618        /// redacted form.
1619        #[instrument(skip_all)]
1620        async fn maybe_apply_new_redaction(
1621            &mut self,
1622            event: &Event,
1623        ) -> Result<(), EventCacheError> {
1624            let raw_event = event.raw();
1625
1626            // Do not deserialise the entire event if we aren't certain it's a
1627            // `m.room.redaction`. It saves a non-negligible amount of computations.
1628            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1629                raw_event.get_field::<MessageLikeEventType>("type")
1630            else {
1631                return Ok(());
1632            };
1633
1634            // It is a `m.room.redaction`! We can deserialize it entirely.
1635
1636            let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1637                redaction,
1638            ))) = raw_event.deserialize()
1639            else {
1640                return Ok(());
1641            };
1642
1643            let Some(event_id) = redaction.redacts(&self.room_version_rules.redaction) else {
1644                warn!("missing target event id from the redaction event");
1645                return Ok(());
1646            };
1647
1648            // Replace the redacted event by a redacted form, if we knew about it.
1649            let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1650                trace!("redacted event is missing from the linked chunk");
1651                return Ok(());
1652            };
1653
1654            // Don't redact already redacted events.
1655            let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
1656                // TODO: replace with `deserialized.is_redacted()` when
1657                // https://github.com/ruma/ruma/pull/2254 has been merged.
1658                match deserialized {
1659                    AnySyncTimelineEvent::MessageLike(ev) => {
1660                        if ev.is_redacted() {
1661                            return Ok(());
1662                        }
1663                    }
1664                    AnySyncTimelineEvent::State(ev) => {
1665                        if ev.is_redacted() {
1666                            return Ok(());
1667                        }
1668                    }
1669                }
1670
1671                // If the event is part of a thread, update the thread linked chunk and the
1672                // summary.
1673                extract_thread_root(target_event.raw())
1674            } else {
1675                warn!("failed to deserialize the event to redact");
1676                None
1677            };
1678
1679            if let Some(redacted_event) = apply_redaction(
1680                target_event.raw(),
1681                event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
1682                &self.room_version_rules.redaction,
1683            ) {
1684                // It's safe to cast `redacted_event` here:
1685                // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
1686                //   when calling .raw(), so it's still one under the hood.
1687                // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
1688                target_event.replace_raw(redacted_event.cast_unchecked());
1689
1690                self.replace_event_at(location, target_event).await?;
1691
1692                // If the redacted event was part of a thread, remove it in the thread linked
1693                // chunk too, and make sure to update the thread root's summary
1694                // as well.
1695                //
1696                // Note: there is an ordering issue here: the above `replace_event_at` must
1697                // happen BEFORE we recompute the summary, otherwise the set of
1698                // replies may include the to-be-redacted event.
1699                if let Some(thread_root) = thread_root
1700                    && let Some(thread_cache) = self.threads.get_mut(&thread_root)
1701                {
1702                    thread_cache.remove_if_present(event_id);
1703
1704                    // The number of replies may have changed, so update the thread summary if
1705                    // needs be.
1706                    let latest_event_id = thread_cache.latest_event_id();
1707                    self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1708                }
1709            }
1710
1711            Ok(())
1712        }
1713
1714        /// Save events into the database, without notifying observers.
1715        pub async fn save_events(
1716            &self,
1717            events: impl IntoIterator<Item = Event>,
1718        ) -> Result<(), EventCacheError> {
1719            let store = self.store.clone();
1720            let room_id = self.room.clone();
1721            let events = events.into_iter().collect::<Vec<_>>();
1722
1723            // Spawn a task so the save is uninterrupted by task cancellation.
1724            spawn(async move {
1725                let store = store.lock().await?;
1726                for event in events {
1727                    store.save_event(&room_id, event).await?;
1728                }
1729                super::Result::Ok(())
1730            })
1731            .await
1732            .expect("joining failed")?;
1733
1734            Ok(())
1735        }
1736
1737        /// Handle the result of a sync.
1738        ///
1739        /// It may send room event cache updates to the given sender, if it
1740        /// generated any of those.
1741        ///
1742        /// Returns `true` for the first part of the tuple if a new gap
1743        /// (previous-batch token) has been inserted, `false` otherwise.
1744        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1745        pub async fn handle_sync(
1746            &mut self,
1747            mut timeline: Timeline,
1748        ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1749            let mut prev_batch = timeline.prev_batch.take();
1750
1751            let DeduplicationOutcome {
1752                all_events: events,
1753                in_memory_duplicated_event_ids,
1754                in_store_duplicated_event_ids,
1755                non_empty_all_duplicates: all_duplicates,
1756            } = filter_duplicate_events(
1757                &self.store,
1758                LinkedChunkId::Room(self.room.as_ref()),
1759                &self.room_linked_chunk,
1760                timeline.events,
1761            )
1762            .await?;
1763
1764            // If the timeline isn't limited, and we already knew about some past events,
1765            // then this definitely knows what the timeline head is (either we know
1766            // about all the events persisted in storage, or we have a gap
1767            // somewhere). In this case, we can ditch the previous-batch
1768            // token, which is an optimization to avoid unnecessary future back-pagination
1769            // requests.
1770            //
1771            // We can also ditch it if we knew about all the events that came from sync,
1772            // namely, they were all deduplicated. In this case, using the
1773            // previous-batch token would only result in fetching other events we
1774            // knew about. This is slightly incorrect in the presence of
1775            // network splits, but this has shown to be Good Enough™.
1776            if !timeline.limited && self.room_linked_chunk.events().next().is_some()
1777                || all_duplicates
1778            {
1779                prev_batch = None;
1780            }
1781
1782            if prev_batch.is_some() {
1783                // Sad time: there's a gap, somewhere, in the timeline, and there's at least one
1784                // non-duplicated event. We don't know which threads might have gappy, so we
1785                // must invalidate them all :(
1786                // TODO(bnjbvr): figure out a better catchup mechanism for threads.
1787                let mut summaries_to_update = Vec::new();
1788
1789                for (thread_root, thread) in self.threads.iter_mut() {
1790                    // Empty the thread's linked chunk.
1791                    thread.clear();
1792
1793                    summaries_to_update.push(thread_root.clone());
1794                }
1795
1796                // Now, update the summaries to indicate that we're not sure what the latest
1797                // thread event is. The thread count can remain as is, as it might still be
1798                // valid, and there's no good value to reset it to, anyways.
1799                for thread_root in summaries_to_update {
1800                    let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1801                    else {
1802                        trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1803                        continue;
1804                    };
1805
1806                    if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1807                        prev_summary.latest_reply = None;
1808
1809                        target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1810
1811                        self.replace_event_at(location, target_event).await?;
1812                    }
1813                }
1814            }
1815
1816            if all_duplicates {
1817                // No new events and no gap (per the previous check), thus no need to change the
1818                // room state. We're done!
1819                return Ok((false, Vec::new()));
1820            }
1821
1822            let has_new_gap = prev_batch.is_some();
1823
1824            // If we've never waited for an initial previous-batch token, and we've now
1825            // inserted a gap, no need to wait for a previous-batch token later.
1826            if !self.waited_for_initial_prev_token && has_new_gap {
1827                self.waited_for_initial_prev_token = true;
1828            }
1829
1830            // Remove the old duplicated events.
1831            //
1832            // We don't have to worry the removals can change the position of the existing
1833            // events, because we are pushing all _new_ `events` at the back.
1834            self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1835                .await?;
1836
1837            self.room_linked_chunk
1838                .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1839
1840            self.post_process_new_events(events, true).await?;
1841
1842            if timeline.limited && has_new_gap {
1843                // If there was a previous batch token for a limited timeline, unload the chunks
1844                // so it only contains the last one; otherwise, there might be a
1845                // valid gap in between, and observers may not render it (yet).
1846                //
1847                // We must do this *after* persisting these events to storage (in
1848                // `post_process_new_events`).
1849                self.shrink_to_last_chunk().await?;
1850            }
1851
1852            let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1853
1854            Ok((has_new_gap, timeline_event_diffs))
1855        }
1856
1857        /// Handle the result of a single back-pagination request.
1858        ///
1859        /// If the `prev_token` is set, then this function will check that the
1860        /// corresponding gap is present in the in-memory linked chunk.
1861        /// If it's not the case, `Ok(None)` will be returned, and the
1862        /// caller may decide to do something based on that (e.g. restart a
1863        /// pagination).
1864        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1865        pub async fn handle_backpagination(
1866            &mut self,
1867            events: Vec<Event>,
1868            mut new_token: Option<String>,
1869            prev_token: Option<String>,
1870        ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1871        {
1872            // Check that the previous token still exists; otherwise it's a sign that the
1873            // room's timeline has been cleared.
1874            let prev_gap_id = if let Some(token) = prev_token {
1875                // Find the corresponding gap in the in-memory linked chunk.
1876                let gap_chunk_id = self.room_linked_chunk.chunk_identifier(|chunk| {
1877                    matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1878                });
1879
1880                if gap_chunk_id.is_none() {
1881                    // We got a previous-batch token from the linked chunk *before* running the
1882                    // request, but it is missing *after* completing the request.
1883                    //
1884                    // It may be a sign the linked chunk has been reset, but it's fine, per this
1885                    // function's contract.
1886                    return Ok(None);
1887                }
1888
1889                gap_chunk_id
1890            } else {
1891                None
1892            };
1893
1894            let DeduplicationOutcome {
1895                all_events: mut events,
1896                in_memory_duplicated_event_ids,
1897                in_store_duplicated_event_ids,
1898                non_empty_all_duplicates: all_duplicates,
1899            } = filter_duplicate_events(
1900                &self.store,
1901                LinkedChunkId::Room(self.room.as_ref()),
1902                &self.room_linked_chunk,
1903                events,
1904            )
1905            .await?;
1906
1907            // If not all the events have been back-paginated, we need to remove the
1908            // previous ones, otherwise we can end up with misordered events.
1909            //
1910            // Consider the following scenario:
1911            // - sync returns [D, E, F]
1912            // - then sync returns [] with a previous batch token PB1, so the internal
1913            //   linked chunk state is [D, E, F, PB1].
1914            // - back-paginating with PB1 may return [A, B, C, D, E, F].
1915            //
1916            // Only inserting the new events when replacing PB1 would result in a timeline
1917            // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
1918            // all the events, in case this happens (see also #4746).
1919
1920            if !all_duplicates {
1921                // Let's forget all the previous events.
1922                self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1923                    .await?;
1924            } else {
1925                // All new events are duplicated, they can all be ignored.
1926                events.clear();
1927                // The gap can be ditched too, as it won't be useful to backpaginate any
1928                // further.
1929                new_token = None;
1930            }
1931
1932            // `/messages` has been called with `dir=b` (backwards), so the events are in
1933            // the inverted order; reorder them.
1934            let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1935
1936            let new_gap = new_token.map(|prev_token| Gap { prev_token });
1937            let reached_start = self.room_linked_chunk.finish_back_pagination(
1938                prev_gap_id,
1939                new_gap,
1940                &topo_ordered_events,
1941            );
1942
1943            // Note: this flushes updates to the store.
1944            self.post_process_new_events(topo_ordered_events, false).await?;
1945
1946            let event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
1947
1948            Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1949        }
1950
1951        /// Subscribe to thread for a given root event, and get a (maybe empty)
1952        /// initially known list of events for that thread.
1953        pub fn subscribe_to_thread(
1954            &mut self,
1955            root: OwnedEventId,
1956        ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1957            self.get_or_reload_thread(root).subscribe()
1958        }
1959
1960        /// Back paginate in the given thread.
1961        ///
1962        /// Will always start from the end, unless we previously paginated.
1963        pub fn finish_thread_network_pagination(
1964            &mut self,
1965            root: OwnedEventId,
1966            prev_token: Option<String>,
1967            new_token: Option<String>,
1968            events: Vec<Event>,
1969        ) -> Option<BackPaginationOutcome> {
1970            self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1971        }
1972
1973        pub fn load_more_thread_events_backwards(
1974            &mut self,
1975            root: OwnedEventId,
1976        ) -> LoadMoreEventsBackwardsOutcome {
1977            self.get_or_reload_thread(root).load_more_events_backwards()
1978        }
1979    }
1980}
1981
1982/// An enum representing where an event has been found.
1983pub(super) enum EventLocation {
1984    /// Event lives in memory (and likely in the store!).
1985    Memory(Position),
1986
1987    /// Event lives in the store only, it has not been loaded in memory yet.
1988    Store,
1989}
1990
1991pub(super) use private::RoomEventCacheState;
1992
1993#[cfg(test)]
1994mod tests {
1995    use matrix_sdk_base::event_cache::Event;
1996    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1997    use ruma::{
1998        RoomId, event_id,
1999        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2000        room_id, user_id,
2001    };
2002
2003    use crate::test_utils::logged_in_client;
2004
2005    #[async_test]
2006    async fn test_find_event_by_id_with_edit_relation() {
2007        let original_id = event_id!("$original");
2008        let related_id = event_id!("$related");
2009        let room_id = room_id!("!galette:saucisse.bzh");
2010        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2011
2012        assert_relations(
2013            room_id,
2014            f.text_msg("Original event").event_id(original_id).into(),
2015            f.text_msg("* An edited event")
2016                .edit(
2017                    original_id,
2018                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2019                )
2020                .event_id(related_id)
2021                .into(),
2022            f,
2023        )
2024        .await;
2025    }
2026
2027    #[async_test]
2028    async fn test_find_event_by_id_with_thread_reply_relation() {
2029        let original_id = event_id!("$original");
2030        let related_id = event_id!("$related");
2031        let room_id = room_id!("!galette:saucisse.bzh");
2032        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2033
2034        assert_relations(
2035            room_id,
2036            f.text_msg("Original event").event_id(original_id).into(),
2037            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2038            f,
2039        )
2040        .await;
2041    }
2042
2043    #[async_test]
2044    async fn test_find_event_by_id_with_reaction_relation() {
2045        let original_id = event_id!("$original");
2046        let related_id = event_id!("$related");
2047        let room_id = room_id!("!galette:saucisse.bzh");
2048        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2049
2050        assert_relations(
2051            room_id,
2052            f.text_msg("Original event").event_id(original_id).into(),
2053            f.reaction(original_id, ":D").event_id(related_id).into(),
2054            f,
2055        )
2056        .await;
2057    }
2058
2059    #[async_test]
2060    async fn test_find_event_by_id_with_poll_response_relation() {
2061        let original_id = event_id!("$original");
2062        let related_id = event_id!("$related");
2063        let room_id = room_id!("!galette:saucisse.bzh");
2064        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2065
2066        assert_relations(
2067            room_id,
2068            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2069                .event_id(original_id)
2070                .into(),
2071            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2072            f,
2073        )
2074        .await;
2075    }
2076
2077    #[async_test]
2078    async fn test_find_event_by_id_with_poll_end_relation() {
2079        let original_id = event_id!("$original");
2080        let related_id = event_id!("$related");
2081        let room_id = room_id!("!galette:saucisse.bzh");
2082        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2083
2084        assert_relations(
2085            room_id,
2086            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2087                .event_id(original_id)
2088                .into(),
2089            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2090            f,
2091        )
2092        .await;
2093    }
2094
2095    #[async_test]
2096    async fn test_find_event_by_id_with_filtered_relationships() {
2097        let original_id = event_id!("$original");
2098        let related_id = event_id!("$related");
2099        let associated_related_id = event_id!("$recursive_related");
2100        let room_id = room_id!("!galette:saucisse.bzh");
2101        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2102
2103        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2104        let related_event = event_factory
2105            .text_msg("* Edited event")
2106            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2107            .event_id(related_id)
2108            .into();
2109        let associated_related_event =
2110            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2111
2112        let client = logged_in_client(None).await;
2113
2114        let event_cache = client.event_cache();
2115        event_cache.subscribe().unwrap();
2116
2117        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2118        let room = client.get_room(room_id).unwrap();
2119
2120        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2121
2122        // Save the original event.
2123        room_event_cache.save_events([original_event]).await;
2124
2125        // Save the related event.
2126        room_event_cache.save_events([related_event]).await;
2127
2128        // Save the associated related event, which redacts the related event.
2129        room_event_cache.save_events([associated_related_event]).await;
2130
2131        let filter = Some(vec![RelationType::Replacement]);
2132        let (event, related_events) =
2133            room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2134        // Fetched event is the right one.
2135        let cached_event_id = event.event_id().unwrap();
2136        assert_eq!(cached_event_id, original_id);
2137
2138        // There's only the edit event (an edit event can't have its own edit event).
2139        assert_eq!(related_events.len(), 1);
2140
2141        let related_event_id = related_events[0].event_id().unwrap();
2142        assert_eq!(related_event_id, related_id);
2143
2144        // Now we'll filter threads instead, there should be no related events
2145        let filter = Some(vec![RelationType::Thread]);
2146        let (event, related_events) =
2147            room_event_cache.find_event_with_relations(original_id, filter).await.unwrap();
2148        // Fetched event is the right one.
2149        let cached_event_id = event.event_id().unwrap();
2150        assert_eq!(cached_event_id, original_id);
2151        // No Thread related events found
2152        assert!(related_events.is_empty());
2153    }
2154
2155    #[async_test]
2156    async fn test_find_event_by_id_with_recursive_relation() {
2157        let original_id = event_id!("$original");
2158        let related_id = event_id!("$related");
2159        let associated_related_id = event_id!("$recursive_related");
2160        let room_id = room_id!("!galette:saucisse.bzh");
2161        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2162
2163        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2164        let related_event = event_factory
2165            .text_msg("* Edited event")
2166            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2167            .event_id(related_id)
2168            .into();
2169        let associated_related_event =
2170            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2171
2172        let client = logged_in_client(None).await;
2173
2174        let event_cache = client.event_cache();
2175        event_cache.subscribe().unwrap();
2176
2177        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2178        let room = client.get_room(room_id).unwrap();
2179
2180        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2181
2182        // Save the original event.
2183        room_event_cache.save_events([original_event]).await;
2184
2185        // Save the related event.
2186        room_event_cache.save_events([related_event]).await;
2187
2188        // Save the associated related event, which redacts the related event.
2189        room_event_cache.save_events([associated_related_event]).await;
2190
2191        let (event, related_events) =
2192            room_event_cache.find_event_with_relations(original_id, None).await.unwrap();
2193        // Fetched event is the right one.
2194        let cached_event_id = event.event_id().unwrap();
2195        assert_eq!(cached_event_id, original_id);
2196
2197        // There are both the related id and the associatively related id
2198        assert_eq!(related_events.len(), 2);
2199
2200        let related_event_id = related_events[0].event_id().unwrap();
2201        assert_eq!(related_event_id, related_id);
2202        let related_event_id = related_events[1].event_id().unwrap();
2203        assert_eq!(related_event_id, associated_related_id);
2204    }
2205
2206    async fn assert_relations(
2207        room_id: &RoomId,
2208        original_event: Event,
2209        related_event: Event,
2210        event_factory: EventFactory,
2211    ) {
2212        let client = logged_in_client(None).await;
2213
2214        let event_cache = client.event_cache();
2215        event_cache.subscribe().unwrap();
2216
2217        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2218        let room = client.get_room(room_id).unwrap();
2219
2220        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2221
2222        // Save the original event.
2223        let original_event_id = original_event.event_id().unwrap();
2224        room_event_cache.save_events([original_event]).await;
2225
2226        // Save an unrelated event to check it's not in the related events list.
2227        let unrelated_id = event_id!("$2");
2228        room_event_cache
2229            .save_events([event_factory
2230                .text_msg("An unrelated event")
2231                .event_id(unrelated_id)
2232                .into()])
2233            .await;
2234
2235        // Save the related event.
2236        let related_id = related_event.event_id().unwrap();
2237        room_event_cache.save_events([related_event]).await;
2238
2239        let (event, related_events) =
2240            room_event_cache.find_event_with_relations(&original_event_id, None).await.unwrap();
2241        // Fetched event is the right one.
2242        let cached_event_id = event.event_id().unwrap();
2243        assert_eq!(cached_event_id, original_event_id);
2244
2245        // There is only the actually related event in the related ones
2246        let related_event_id = related_events[0].event_id().unwrap();
2247        assert_eq!(related_event_id, related_id);
2248    }
2249}
2250
2251#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
2252mod timed_tests {
2253    use std::sync::Arc;
2254
2255    use assert_matches::assert_matches;
2256    use assert_matches2::assert_let;
2257    use eyeball_im::VectorDiff;
2258    use futures_util::FutureExt;
2259    use matrix_sdk_base::{
2260        event_cache::{
2261            Gap,
2262            store::{EventCacheStore as _, MemoryStore},
2263        },
2264        linked_chunk::{
2265            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2266            lazy_loader::from_all_chunks,
2267        },
2268        store::StoreConfig,
2269        sync::{JoinedRoomUpdate, Timeline},
2270    };
2271    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2272    use ruma::{
2273        OwnedUserId, event_id,
2274        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2275        room_id, user_id,
2276    };
2277    use tokio::task::yield_now;
2278
2279    use super::RoomEventCacheGenericUpdate;
2280    use crate::{
2281        assert_let_timeout,
2282        event_cache::{RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2283        test_utils::client::MockClientBuilder,
2284    };
2285
2286    #[async_test]
2287    async fn test_write_to_storage() {
2288        let room_id = room_id!("!galette:saucisse.bzh");
2289        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2290
2291        let event_cache_store = Arc::new(MemoryStore::new());
2292
2293        let client = MockClientBuilder::new(None)
2294            .on_builder(|builder| {
2295                builder.store_config(
2296                    StoreConfig::new("hodlor".to_owned())
2297                        .event_cache_store(event_cache_store.clone()),
2298                )
2299            })
2300            .build()
2301            .await;
2302
2303        let event_cache = client.event_cache();
2304
2305        // Don't forget to subscribe and like.
2306        event_cache.subscribe().unwrap();
2307
2308        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2309        let room = client.get_room(room_id).unwrap();
2310
2311        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2312        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2313
2314        // Propagate an update for a message and a prev-batch token.
2315        let timeline = Timeline {
2316            limited: true,
2317            prev_batch: Some("raclette".to_owned()),
2318            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2319        };
2320
2321        room_event_cache
2322            .inner
2323            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2324            .await
2325            .unwrap();
2326
2327        // Just checking the generic update is correct.
2328        assert_matches!(
2329            generic_stream.recv().await,
2330            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2331                assert_eq!(expected_room_id, room_id);
2332            }
2333        );
2334
2335        // Check the storage.
2336        let linked_chunk = from_all_chunks::<3, _, _>(
2337            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2338        )
2339        .unwrap()
2340        .unwrap();
2341
2342        assert_eq!(linked_chunk.chunks().count(), 2);
2343
2344        let mut chunks = linked_chunk.chunks();
2345
2346        // We start with the gap.
2347        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2348            assert_eq!(gap.prev_token, "raclette");
2349        });
2350
2351        // Then we have the stored event.
2352        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2353            assert_eq!(events.len(), 1);
2354            let deserialized = events[0].raw().deserialize().unwrap();
2355            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2356            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2357        });
2358
2359        // That's all, folks!
2360        assert!(chunks.next().is_none());
2361    }
2362
2363    #[async_test]
2364    async fn test_write_to_storage_strips_bundled_relations() {
2365        let room_id = room_id!("!galette:saucisse.bzh");
2366        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2367
2368        let event_cache_store = Arc::new(MemoryStore::new());
2369
2370        let client = MockClientBuilder::new(None)
2371            .on_builder(|builder| {
2372                builder.store_config(
2373                    StoreConfig::new("hodlor".to_owned())
2374                        .event_cache_store(event_cache_store.clone()),
2375                )
2376            })
2377            .build()
2378            .await;
2379
2380        let event_cache = client.event_cache();
2381
2382        // Don't forget to subscribe and like.
2383        event_cache.subscribe().unwrap();
2384
2385        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2386        let room = client.get_room(room_id).unwrap();
2387
2388        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2389        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2390
2391        // Propagate an update for a message with bundled relations.
2392        let ev = f
2393            .text_msg("hey yo")
2394            .sender(*ALICE)
2395            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2396            .into_event();
2397
2398        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2399
2400        room_event_cache
2401            .inner
2402            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2403            .await
2404            .unwrap();
2405
2406        // Just checking the generic update is correct.
2407        assert_matches!(
2408            generic_stream.recv().await,
2409            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2410                assert_eq!(expected_room_id, room_id);
2411            }
2412        );
2413
2414        // The in-memory linked chunk keeps the bundled relation.
2415        {
2416            let events = room_event_cache.events().await;
2417
2418            assert_eq!(events.len(), 1);
2419
2420            let ev = events[0].raw().deserialize().unwrap();
2421            assert_let!(
2422                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2423            );
2424
2425            let original = msg.as_original().unwrap();
2426            assert_eq!(original.content.body(), "hey yo");
2427            assert!(original.unsigned.relations.replace.is_some());
2428        }
2429
2430        // The one in storage does not.
2431        let linked_chunk = from_all_chunks::<3, _, _>(
2432            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2433        )
2434        .unwrap()
2435        .unwrap();
2436
2437        assert_eq!(linked_chunk.chunks().count(), 1);
2438
2439        let mut chunks = linked_chunk.chunks();
2440        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2441            assert_eq!(events.len(), 1);
2442
2443            let ev = events[0].raw().deserialize().unwrap();
2444            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2445
2446            let original = msg.as_original().unwrap();
2447            assert_eq!(original.content.body(), "hey yo");
2448            assert!(original.unsigned.relations.replace.is_none());
2449        });
2450
2451        // That's all, folks!
2452        assert!(chunks.next().is_none());
2453    }
2454
2455    #[async_test]
2456    async fn test_clear() {
2457        let room_id = room_id!("!galette:saucisse.bzh");
2458        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2459
2460        let event_cache_store = Arc::new(MemoryStore::new());
2461
2462        let event_id1 = event_id!("$1");
2463        let event_id2 = event_id!("$2");
2464
2465        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2466        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2467
2468        // Prefill the store with some data.
2469        event_cache_store
2470            .handle_linked_chunk_updates(
2471                LinkedChunkId::Room(room_id),
2472                vec![
2473                    // An empty items chunk.
2474                    Update::NewItemsChunk {
2475                        previous: None,
2476                        new: ChunkIdentifier::new(0),
2477                        next: None,
2478                    },
2479                    // A gap chunk.
2480                    Update::NewGapChunk {
2481                        previous: Some(ChunkIdentifier::new(0)),
2482                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2483                        new: ChunkIdentifier::new(42),
2484                        next: None,
2485                        gap: Gap { prev_token: "comté".to_owned() },
2486                    },
2487                    // Another items chunk, non-empty this time.
2488                    Update::NewItemsChunk {
2489                        previous: Some(ChunkIdentifier::new(42)),
2490                        new: ChunkIdentifier::new(1),
2491                        next: None,
2492                    },
2493                    Update::PushItems {
2494                        at: Position::new(ChunkIdentifier::new(1), 0),
2495                        items: vec![ev1.clone()],
2496                    },
2497                    // And another items chunk, non-empty again.
2498                    Update::NewItemsChunk {
2499                        previous: Some(ChunkIdentifier::new(1)),
2500                        new: ChunkIdentifier::new(2),
2501                        next: None,
2502                    },
2503                    Update::PushItems {
2504                        at: Position::new(ChunkIdentifier::new(2), 0),
2505                        items: vec![ev2.clone()],
2506                    },
2507                ],
2508            )
2509            .await
2510            .unwrap();
2511
2512        let client = MockClientBuilder::new(None)
2513            .on_builder(|builder| {
2514                builder.store_config(
2515                    StoreConfig::new("hodlor".to_owned())
2516                        .event_cache_store(event_cache_store.clone()),
2517                )
2518            })
2519            .build()
2520            .await;
2521
2522        let event_cache = client.event_cache();
2523
2524        // Don't forget to subscribe and like.
2525        event_cache.subscribe().unwrap();
2526
2527        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2528        let room = client.get_room(room_id).unwrap();
2529
2530        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2531
2532        let (items, mut stream) = room_event_cache.subscribe().await;
2533        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2534
2535        // The rooms knows about all cached events.
2536        {
2537            assert!(room_event_cache.find_event(event_id1).await.is_some());
2538            assert!(room_event_cache.find_event(event_id2).await.is_some());
2539        }
2540
2541        // But only part of events are loaded from the store
2542        {
2543            // The room must contain only one event because only one chunk has been loaded.
2544            assert_eq!(items.len(), 1);
2545            assert_eq!(items[0].event_id().unwrap(), event_id2);
2546
2547            assert!(stream.is_empty());
2548        }
2549
2550        // Let's load more chunks to load all events.
2551        {
2552            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2553
2554            assert_let_timeout!(
2555                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2556            );
2557            assert_eq!(diffs.len(), 1);
2558            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2559                // Here you are `event_id1`!
2560                assert_eq!(event.event_id().unwrap(), event_id1);
2561            });
2562
2563            assert!(stream.is_empty());
2564        }
2565
2566        // After clearing,…
2567        room_event_cache.clear().await.unwrap();
2568
2569        //… we get an update that the content has been cleared.
2570        assert_let_timeout!(
2571            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2572        );
2573        assert_eq!(diffs.len(), 1);
2574        assert_let!(VectorDiff::Clear = &diffs[0]);
2575
2576        // … same with a generic update.
2577        assert_let_timeout!(
2578            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2579        );
2580        assert_eq!(received_room_id, room_id);
2581
2582        // Events individually are not forgotten by the event cache, after clearing a
2583        // room.
2584        assert!(room_event_cache.find_event(event_id1).await.is_some());
2585
2586        // But their presence in a linked chunk is forgotten.
2587        let items = room_event_cache.events().await;
2588        assert!(items.is_empty());
2589
2590        // The event cache store too.
2591        let linked_chunk = from_all_chunks::<3, _, _>(
2592            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2593        )
2594        .unwrap()
2595        .unwrap();
2596
2597        // Note: while the event cache store could return `None` here, clearing it will
2598        // reset it to its initial form, maintaining the invariant that it
2599        // contains a single items chunk that's empty.
2600        assert_eq!(linked_chunk.num_items(), 0);
2601    }
2602
2603    #[async_test]
2604    async fn test_load_from_storage() {
2605        let room_id = room_id!("!galette:saucisse.bzh");
2606        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2607
2608        let event_cache_store = Arc::new(MemoryStore::new());
2609
2610        let event_id1 = event_id!("$1");
2611        let event_id2 = event_id!("$2");
2612
2613        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2614        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2615
2616        // Prefill the store with some data.
2617        event_cache_store
2618            .handle_linked_chunk_updates(
2619                LinkedChunkId::Room(room_id),
2620                vec![
2621                    // An empty items chunk.
2622                    Update::NewItemsChunk {
2623                        previous: None,
2624                        new: ChunkIdentifier::new(0),
2625                        next: None,
2626                    },
2627                    // A gap chunk.
2628                    Update::NewGapChunk {
2629                        previous: Some(ChunkIdentifier::new(0)),
2630                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2631                        new: ChunkIdentifier::new(42),
2632                        next: None,
2633                        gap: Gap { prev_token: "cheddar".to_owned() },
2634                    },
2635                    // Another items chunk, non-empty this time.
2636                    Update::NewItemsChunk {
2637                        previous: Some(ChunkIdentifier::new(42)),
2638                        new: ChunkIdentifier::new(1),
2639                        next: None,
2640                    },
2641                    Update::PushItems {
2642                        at: Position::new(ChunkIdentifier::new(1), 0),
2643                        items: vec![ev1.clone()],
2644                    },
2645                    // And another items chunk, non-empty again.
2646                    Update::NewItemsChunk {
2647                        previous: Some(ChunkIdentifier::new(1)),
2648                        new: ChunkIdentifier::new(2),
2649                        next: None,
2650                    },
2651                    Update::PushItems {
2652                        at: Position::new(ChunkIdentifier::new(2), 0),
2653                        items: vec![ev2.clone()],
2654                    },
2655                ],
2656            )
2657            .await
2658            .unwrap();
2659
2660        let client = MockClientBuilder::new(None)
2661            .on_builder(|builder| {
2662                builder.store_config(
2663                    StoreConfig::new("hodlor".to_owned())
2664                        .event_cache_store(event_cache_store.clone()),
2665                )
2666            })
2667            .build()
2668            .await;
2669
2670        let event_cache = client.event_cache();
2671
2672        // Don't forget to subscribe and like.
2673        event_cache.subscribe().unwrap();
2674
2675        // Let's check whether the generic updates are received for the initialisation.
2676        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2677
2678        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2679        let room = client.get_room(room_id).unwrap();
2680
2681        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2682
2683        // The room event cache has been loaded. A generic update must have been
2684        // triggered.
2685        assert_matches!(
2686            generic_stream.recv().await,
2687            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2688                assert_eq!(room_id, expected_room_id);
2689            }
2690        );
2691
2692        let (items, mut stream) = room_event_cache.subscribe().await;
2693
2694        // The initial items contain one event because only the last chunk is loaded by
2695        // default.
2696        assert_eq!(items.len(), 1);
2697        assert_eq!(items[0].event_id().unwrap(), event_id2);
2698        assert!(stream.is_empty());
2699
2700        // The event cache knows only all events though, even if they aren't loaded.
2701        assert!(room_event_cache.find_event(event_id1).await.is_some());
2702        assert!(room_event_cache.find_event(event_id2).await.is_some());
2703
2704        // Let's paginate to load more events.
2705        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2706
2707        assert_let_timeout!(
2708            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2709        );
2710        assert_eq!(diffs.len(), 1);
2711        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2712            assert_eq!(event.event_id().unwrap(), event_id1);
2713        });
2714
2715        assert!(stream.is_empty());
2716
2717        // A generic update is triggered too.
2718        assert_matches!(
2719            generic_stream.recv().await,
2720            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2721                assert_eq!(expected_room_id, room_id);
2722            }
2723        );
2724
2725        // A new update with one of these events leads to deduplication.
2726        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2727
2728        room_event_cache
2729            .inner
2730            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2731            .await
2732            .unwrap();
2733
2734        // Just checking the generic update is correct. There is a duplicate event, so
2735        // no generic changes whatsoever!
2736        assert!(generic_stream.recv().now_or_never().is_none());
2737
2738        // The stream doesn't report these changes *yet*. Use the items vector given
2739        // when subscribing, to check that the items correspond to their new
2740        // positions. The duplicated item is removed (so it's not the first
2741        // element anymore), and it's added to the back of the list.
2742        let items = room_event_cache.events().await;
2743        assert_eq!(items.len(), 2);
2744        assert_eq!(items[0].event_id().unwrap(), event_id1);
2745        assert_eq!(items[1].event_id().unwrap(), event_id2);
2746    }
2747
2748    #[async_test]
2749    async fn test_load_from_storage_resilient_to_failure() {
2750        let room_id = room_id!("!fondue:patate.ch");
2751        let event_cache_store = Arc::new(MemoryStore::new());
2752
2753        let event = EventFactory::new()
2754            .room(room_id)
2755            .sender(user_id!("@ben:saucisse.bzh"))
2756            .text_msg("foo")
2757            .event_id(event_id!("$42"))
2758            .into_event();
2759
2760        // Prefill the store with invalid data: two chunks that form a cycle.
2761        event_cache_store
2762            .handle_linked_chunk_updates(
2763                LinkedChunkId::Room(room_id),
2764                vec![
2765                    Update::NewItemsChunk {
2766                        previous: None,
2767                        new: ChunkIdentifier::new(0),
2768                        next: None,
2769                    },
2770                    Update::PushItems {
2771                        at: Position::new(ChunkIdentifier::new(0), 0),
2772                        items: vec![event],
2773                    },
2774                    Update::NewItemsChunk {
2775                        previous: Some(ChunkIdentifier::new(0)),
2776                        new: ChunkIdentifier::new(1),
2777                        next: Some(ChunkIdentifier::new(0)),
2778                    },
2779                ],
2780            )
2781            .await
2782            .unwrap();
2783
2784        let client = MockClientBuilder::new(None)
2785            .on_builder(|builder| {
2786                builder.store_config(
2787                    StoreConfig::new("holder".to_owned())
2788                        .event_cache_store(event_cache_store.clone()),
2789                )
2790            })
2791            .build()
2792            .await;
2793
2794        let event_cache = client.event_cache();
2795
2796        // Don't forget to subscribe and like.
2797        event_cache.subscribe().unwrap();
2798
2799        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2800        let room = client.get_room(room_id).unwrap();
2801
2802        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2803
2804        let items = room_event_cache.events().await;
2805
2806        // Because the persisted content was invalid, the room store is reset: there are
2807        // no events in the cache.
2808        assert!(items.is_empty());
2809
2810        // Storage doesn't contain anything. It would also be valid that it contains a
2811        // single initial empty items chunk.
2812        let raw_chunks =
2813            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
2814        assert!(raw_chunks.is_empty());
2815    }
2816
2817    #[async_test]
2818    async fn test_no_useless_gaps() {
2819        let room_id = room_id!("!galette:saucisse.bzh");
2820
2821        let client = MockClientBuilder::new(None).build().await;
2822
2823        let event_cache = client.event_cache();
2824        event_cache.subscribe().unwrap();
2825
2826        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2827        let room = client.get_room(room_id).unwrap();
2828        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2829        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2830
2831        let f = EventFactory::new().room(room_id).sender(*ALICE);
2832
2833        // Propagate an update including a limited timeline with one message and a
2834        // prev-batch token.
2835        room_event_cache
2836            .inner
2837            .handle_joined_room_update(JoinedRoomUpdate {
2838                timeline: Timeline {
2839                    limited: true,
2840                    prev_batch: Some("raclette".to_owned()),
2841                    events: vec![f.text_msg("hey yo").into_event()],
2842                },
2843                ..Default::default()
2844            })
2845            .await
2846            .unwrap();
2847
2848        // Just checking the generic update is correct.
2849        assert_matches!(
2850            generic_stream.recv().await,
2851            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2852                assert_eq!(expected_room_id, room_id);
2853            }
2854        );
2855
2856        {
2857            let mut state = room_event_cache.inner.state.write().await;
2858
2859            let mut num_gaps = 0;
2860            let mut num_events = 0;
2861
2862            for c in state.room_linked_chunk().chunks() {
2863                match c.content() {
2864                    ChunkContent::Items(items) => num_events += items.len(),
2865                    ChunkContent::Gap(_) => num_gaps += 1,
2866                }
2867            }
2868
2869            // The limited sync unloads the chunk, so it will appear as if there are only
2870            // the events.
2871            assert_eq!(num_gaps, 0);
2872            assert_eq!(num_events, 1);
2873
2874            // But if I manually reload more of the chunk, the gap will be present.
2875            assert_matches!(
2876                state.load_more_events_backwards().await.unwrap(),
2877                LoadMoreEventsBackwardsOutcome::Gap { .. }
2878            );
2879
2880            num_gaps = 0;
2881            num_events = 0;
2882            for c in state.room_linked_chunk().chunks() {
2883                match c.content() {
2884                    ChunkContent::Items(items) => num_events += items.len(),
2885                    ChunkContent::Gap(_) => num_gaps += 1,
2886                }
2887            }
2888
2889            // The gap must have been stored.
2890            assert_eq!(num_gaps, 1);
2891            assert_eq!(num_events, 1);
2892        }
2893
2894        // Now, propagate an update for another message, but the timeline isn't limited
2895        // this time.
2896        room_event_cache
2897            .inner
2898            .handle_joined_room_update(JoinedRoomUpdate {
2899                timeline: Timeline {
2900                    limited: false,
2901                    prev_batch: Some("fondue".to_owned()),
2902                    events: vec![f.text_msg("sup").into_event()],
2903                },
2904                ..Default::default()
2905            })
2906            .await
2907            .unwrap();
2908
2909        // Just checking the generic update is correct.
2910        assert_matches!(
2911            generic_stream.recv().await,
2912            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2913                assert_eq!(expected_room_id, room_id);
2914            }
2915        );
2916
2917        {
2918            let state = room_event_cache.inner.state.read().await;
2919
2920            let mut num_gaps = 0;
2921            let mut num_events = 0;
2922
2923            for c in state.room_linked_chunk().chunks() {
2924                match c.content() {
2925                    ChunkContent::Items(items) => num_events += items.len(),
2926                    ChunkContent::Gap(gap) => {
2927                        assert_eq!(gap.prev_token, "raclette");
2928                        num_gaps += 1;
2929                    }
2930                }
2931            }
2932
2933            // There's only the previous gap, no new ones.
2934            assert_eq!(num_gaps, 1);
2935            assert_eq!(num_events, 2);
2936        }
2937    }
2938
2939    #[async_test]
2940    async fn test_shrink_to_last_chunk() {
2941        let room_id = room_id!("!galette:saucisse.bzh");
2942
2943        let client = MockClientBuilder::new(None).build().await;
2944
2945        let f = EventFactory::new().room(room_id);
2946
2947        let evid1 = event_id!("$1");
2948        let evid2 = event_id!("$2");
2949
2950        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2951        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2952
2953        // Fill the event cache store with an initial linked chunk with 2 events chunks.
2954        {
2955            let store = client.event_cache_store();
2956            let store = store.lock().await.unwrap();
2957            store
2958                .handle_linked_chunk_updates(
2959                    LinkedChunkId::Room(room_id),
2960                    vec![
2961                        Update::NewItemsChunk {
2962                            previous: None,
2963                            new: ChunkIdentifier::new(0),
2964                            next: None,
2965                        },
2966                        Update::PushItems {
2967                            at: Position::new(ChunkIdentifier::new(0), 0),
2968                            items: vec![ev1],
2969                        },
2970                        Update::NewItemsChunk {
2971                            previous: Some(ChunkIdentifier::new(0)),
2972                            new: ChunkIdentifier::new(1),
2973                            next: None,
2974                        },
2975                        Update::PushItems {
2976                            at: Position::new(ChunkIdentifier::new(1), 0),
2977                            items: vec![ev2],
2978                        },
2979                    ],
2980                )
2981                .await
2982                .unwrap();
2983        }
2984
2985        let event_cache = client.event_cache();
2986        event_cache.subscribe().unwrap();
2987
2988        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2989        let room = client.get_room(room_id).unwrap();
2990        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2991
2992        // Sanity check: lazily loaded, so only includes one item at start.
2993        let (events, mut stream) = room_event_cache.subscribe().await;
2994        assert_eq!(events.len(), 1);
2995        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2996        assert!(stream.is_empty());
2997
2998        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2999
3000        // Force loading the full linked chunk by back-paginating.
3001        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3002        assert_eq!(outcome.events.len(), 1);
3003        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3004        assert!(outcome.reached_start);
3005
3006        // We also get an update about the loading from the store.
3007        assert_let_timeout!(
3008            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3009        );
3010        assert_eq!(diffs.len(), 1);
3011        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3012            assert_eq!(value.event_id().as_deref(), Some(evid1));
3013        });
3014
3015        assert!(stream.is_empty());
3016
3017        // Same for the generic update.
3018        assert_let_timeout!(
3019            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3020        );
3021        assert_eq!(received_room_id, room_id);
3022
3023        // Shrink the linked chunk to the last chunk.
3024        let diffs = room_event_cache
3025            .inner
3026            .state
3027            .write()
3028            .await
3029            .force_shrink_to_last_chunk()
3030            .await
3031            .expect("shrinking should succeed");
3032
3033        // We receive updates about the changes to the linked chunk.
3034        assert_eq!(diffs.len(), 2);
3035        assert_matches!(&diffs[0], VectorDiff::Clear);
3036        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3037            assert_eq!(values.len(), 1);
3038            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3039        });
3040
3041        assert!(stream.is_empty());
3042
3043        // No generic update is sent in this case.
3044        assert!(generic_stream.is_empty());
3045
3046        // When reading the events, we do get only the last one.
3047        let events = room_event_cache.events().await;
3048        assert_eq!(events.len(), 1);
3049        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3050
3051        // But if we back-paginate, we don't need access to network to find out about
3052        // the previous event.
3053        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3054        assert_eq!(outcome.events.len(), 1);
3055        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3056        assert!(outcome.reached_start);
3057    }
3058
3059    #[async_test]
3060    async fn test_room_ordering() {
3061        let room_id = room_id!("!galette:saucisse.bzh");
3062
3063        let client = MockClientBuilder::new(None).build().await;
3064
3065        let f = EventFactory::new().room(room_id).sender(*ALICE);
3066
3067        let evid1 = event_id!("$1");
3068        let evid2 = event_id!("$2");
3069        let evid3 = event_id!("$3");
3070
3071        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3072        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3073        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3074
3075        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3076        {
3077            let store = client.event_cache_store();
3078            let store = store.lock().await.unwrap();
3079            store
3080                .handle_linked_chunk_updates(
3081                    LinkedChunkId::Room(room_id),
3082                    vec![
3083                        Update::NewItemsChunk {
3084                            previous: None,
3085                            new: ChunkIdentifier::new(0),
3086                            next: None,
3087                        },
3088                        Update::PushItems {
3089                            at: Position::new(ChunkIdentifier::new(0), 0),
3090                            items: vec![ev1, ev2],
3091                        },
3092                        Update::NewItemsChunk {
3093                            previous: Some(ChunkIdentifier::new(0)),
3094                            new: ChunkIdentifier::new(1),
3095                            next: None,
3096                        },
3097                        Update::PushItems {
3098                            at: Position::new(ChunkIdentifier::new(1), 0),
3099                            items: vec![ev3.clone()],
3100                        },
3101                    ],
3102                )
3103                .await
3104                .unwrap();
3105        }
3106
3107        let event_cache = client.event_cache();
3108        event_cache.subscribe().unwrap();
3109
3110        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3111        let room = client.get_room(room_id).unwrap();
3112        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3113
3114        // Initially, the linked chunk only contains the last chunk, so only ev3 is
3115        // loaded.
3116        {
3117            let state = room_event_cache.inner.state.read().await;
3118
3119            // But we can get the order of ev1.
3120            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3121
3122            // And that of ev2 as well.
3123            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3124
3125            // ev3, which is loaded, also has a known ordering.
3126            let mut events = state.room_linked_chunk().events();
3127            let (pos, ev) = events.next().unwrap();
3128            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3129            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3130            assert_eq!(state.room_event_order(pos), Some(2));
3131
3132            // No other loaded events.
3133            assert!(events.next().is_none());
3134        }
3135
3136        // Force loading the full linked chunk by back-paginating.
3137        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3138        assert!(outcome.reached_start);
3139
3140        // All events are now loaded, so their order is precisely their enumerated index
3141        // in a linear iteration.
3142        {
3143            let state = room_event_cache.inner.state.read().await;
3144            for (i, (pos, _)) in state.room_linked_chunk().events().enumerate() {
3145                assert_eq!(state.room_event_order(pos), Some(i));
3146            }
3147        }
3148
3149        // Handle a gappy sync with two events (including one duplicate, so
3150        // deduplication kicks in), so that the linked chunk is shrunk to the
3151        // last chunk, and that the linked chunk only contains the last two
3152        // events.
3153        let evid4 = event_id!("$4");
3154        room_event_cache
3155            .inner
3156            .handle_joined_room_update(JoinedRoomUpdate {
3157                timeline: Timeline {
3158                    limited: true,
3159                    prev_batch: Some("fondue".to_owned()),
3160                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3161                },
3162                ..Default::default()
3163            })
3164            .await
3165            .unwrap();
3166
3167        {
3168            let state = room_event_cache.inner.state.read().await;
3169
3170            // After the shrink, only evid3 and evid4 are loaded.
3171            let mut events = state.room_linked_chunk().events();
3172
3173            let (pos, ev) = events.next().unwrap();
3174            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3175            assert_eq!(state.room_event_order(pos), Some(2));
3176
3177            let (pos, ev) = events.next().unwrap();
3178            assert_eq!(ev.event_id().as_deref(), Some(evid4));
3179            assert_eq!(state.room_event_order(pos), Some(3));
3180
3181            // No other loaded events.
3182            assert!(events.next().is_none());
3183
3184            // But we can still get the order of previous events.
3185            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
3186            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
3187
3188            // ev3 doesn't have an order with its previous position, since it's been
3189            // deduplicated.
3190            assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(1), 0)), None);
3191        }
3192    }
3193
3194    #[async_test]
3195    async fn test_auto_shrink_after_all_subscribers_are_gone() {
3196        let room_id = room_id!("!galette:saucisse.bzh");
3197
3198        let client = MockClientBuilder::new(None).build().await;
3199
3200        let f = EventFactory::new().room(room_id);
3201
3202        let evid1 = event_id!("$1");
3203        let evid2 = event_id!("$2");
3204
3205        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3206        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3207
3208        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3209        {
3210            let store = client.event_cache_store();
3211            let store = store.lock().await.unwrap();
3212            store
3213                .handle_linked_chunk_updates(
3214                    LinkedChunkId::Room(room_id),
3215                    vec![
3216                        Update::NewItemsChunk {
3217                            previous: None,
3218                            new: ChunkIdentifier::new(0),
3219                            next: None,
3220                        },
3221                        Update::PushItems {
3222                            at: Position::new(ChunkIdentifier::new(0), 0),
3223                            items: vec![ev1],
3224                        },
3225                        Update::NewItemsChunk {
3226                            previous: Some(ChunkIdentifier::new(0)),
3227                            new: ChunkIdentifier::new(1),
3228                            next: None,
3229                        },
3230                        Update::PushItems {
3231                            at: Position::new(ChunkIdentifier::new(1), 0),
3232                            items: vec![ev2],
3233                        },
3234                    ],
3235                )
3236                .await
3237                .unwrap();
3238        }
3239
3240        let event_cache = client.event_cache();
3241        event_cache.subscribe().unwrap();
3242
3243        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3244        let room = client.get_room(room_id).unwrap();
3245        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3246
3247        // Sanity check: lazily loaded, so only includes one item at start.
3248        let (events1, mut stream1) = room_event_cache.subscribe().await;
3249        assert_eq!(events1.len(), 1);
3250        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3251        assert!(stream1.is_empty());
3252
3253        // Force loading the full linked chunk by back-paginating.
3254        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3255        assert_eq!(outcome.events.len(), 1);
3256        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3257        assert!(outcome.reached_start);
3258
3259        // We also get an update about the loading from the store. Ignore it, for this
3260        // test's sake.
3261        assert_let_timeout!(
3262            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3263        );
3264        assert_eq!(diffs.len(), 1);
3265        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3266            assert_eq!(value.event_id().as_deref(), Some(evid1));
3267        });
3268
3269        assert!(stream1.is_empty());
3270
3271        // Have another subscriber.
3272        // Since it's not the first one, and the previous one loaded some more events,
3273        // the second subscribers sees them all.
3274        let (events2, stream2) = room_event_cache.subscribe().await;
3275        assert_eq!(events2.len(), 2);
3276        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3277        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3278        assert!(stream2.is_empty());
3279
3280        // Drop the first stream, and wait a bit.
3281        drop(stream1);
3282        yield_now().await;
3283
3284        // The second stream remains undisturbed.
3285        assert!(stream2.is_empty());
3286
3287        // Now drop the second stream, and wait a bit.
3288        drop(stream2);
3289        yield_now().await;
3290
3291        // The linked chunk must have auto-shrunk by now.
3292
3293        {
3294            // Check the inner state: there's no more shared auto-shrinker.
3295            let state = room_event_cache.inner.state.read().await;
3296            assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0);
3297        }
3298
3299        // Getting the events will only give us the latest chunk.
3300        let events3 = room_event_cache.events().await;
3301        assert_eq!(events3.len(), 1);
3302        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3303    }
3304
3305    #[async_test]
3306    async fn test_rfind_map_event_in_memory_by() {
3307        let user_id = user_id!("@mnt_io:matrix.org");
3308        let room_id = room_id!("!raclette:patate.ch");
3309        let client = MockClientBuilder::new(None).build().await;
3310
3311        let event_factory = EventFactory::new().room(room_id);
3312
3313        let event_id_0 = event_id!("$ev0");
3314        let event_id_1 = event_id!("$ev1");
3315        let event_id_2 = event_id!("$ev2");
3316        let event_id_3 = event_id!("$ev3");
3317
3318        let event_0 =
3319            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3320        let event_1 =
3321            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3322        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3323        let event_3 =
3324            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3325
3326        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
3327        // events.
3328        {
3329            let store = client.event_cache_store();
3330            let store = store.lock().await.unwrap();
3331            store
3332                .handle_linked_chunk_updates(
3333                    LinkedChunkId::Room(room_id),
3334                    vec![
3335                        Update::NewItemsChunk {
3336                            previous: None,
3337                            new: ChunkIdentifier::new(0),
3338                            next: None,
3339                        },
3340                        Update::PushItems {
3341                            at: Position::new(ChunkIdentifier::new(0), 0),
3342                            items: vec![event_3],
3343                        },
3344                        Update::NewItemsChunk {
3345                            previous: Some(ChunkIdentifier::new(0)),
3346                            new: ChunkIdentifier::new(1),
3347                            next: None,
3348                        },
3349                        Update::PushItems {
3350                            at: Position::new(ChunkIdentifier::new(1), 0),
3351                            items: vec![event_0, event_1, event_2],
3352                        },
3353                    ],
3354                )
3355                .await
3356                .unwrap();
3357        }
3358
3359        let event_cache = client.event_cache();
3360        event_cache.subscribe().unwrap();
3361
3362        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3363        let room = client.get_room(room_id).unwrap();
3364        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3365
3366        // Look for an event from `BOB`: it must be `event_0`.
3367        assert_matches!(
3368            room_event_cache
3369                .rfind_map_event_in_memory_by(|event| {
3370                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
3371                })
3372                .await,
3373            Some(event_id) => {
3374                assert_eq!(event_id.as_deref(), Some(event_id_0));
3375            }
3376        );
3377
3378        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
3379        // because events are looked for in reverse order.
3380        assert_matches!(
3381            room_event_cache
3382                .rfind_map_event_in_memory_by(|event| {
3383                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
3384                })
3385                .await,
3386            Some(event_id) => {
3387                assert_eq!(event_id.as_deref(), Some(event_id_2));
3388            }
3389        );
3390
3391        // Look for an event that is inside the storage, but not loaded.
3392        assert!(
3393            room_event_cache
3394                .rfind_map_event_in_memory_by(|event| {
3395                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3396                        == Some(user_id))
3397                    .then(|| event.event_id())
3398                })
3399                .await
3400                .is_none()
3401        );
3402
3403        // Look for an event that doesn't exist.
3404        assert!(room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.is_none());
3405    }
3406}