matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        Arc,
23        atomic::{AtomicUsize, Ordering},
24    },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::AmbiguityChange,
32    event_cache::Event,
33    linked_chunk::Position,
34    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37    EventId, OwnedEventId, OwnedRoomId, RoomId,
38    api::Direction,
39    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40    serde::Raw,
41};
42use tokio::sync::{
43    Notify,
44    broadcast::{Receiver, Sender},
45    mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50    AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51    RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54    client::WeakClient,
55    event_cache::EventCacheError,
56    room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod pinned_events;
61mod threads;
62
63pub use threads::ThreadEventCacheUpdate;
64
65/// A subset of an event cache, for a room.
66///
67/// Cloning is shallow, and thus is cheap to do.
68#[derive(Clone)]
69pub struct RoomEventCache {
70    pub(super) inner: Arc<RoomEventCacheInner>,
71}
72
73impl fmt::Debug for RoomEventCache {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        f.debug_struct("RoomEventCache").finish_non_exhaustive()
76    }
77}
78
79/// Thin wrapper for a room event cache subscriber, so as to trigger
80/// side-effects when all subscribers are gone.
81///
82/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
83/// more subscribers are active. This is an optimisation to reduce the number of
84/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
85/// active, all data are reduced to the minimum.
86///
87/// The side-effect takes effect on `Drop`.
88#[allow(missing_debug_implementations)]
89pub struct RoomEventCacheSubscriber {
90    /// Underlying receiver of the room event cache's updates.
91    recv: Receiver<RoomEventCacheUpdate>,
92
93    /// To which room are we listening?
94    room_id: OwnedRoomId,
95
96    /// Sender to the auto-shrink channel.
97    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
98
99    /// Shared instance of the auto-shrinker.
100    subscriber_count: Arc<AtomicUsize>,
101}
102
103impl Drop for RoomEventCacheSubscriber {
104    fn drop(&mut self) {
105        let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
106
107        trace!(
108            "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
109        );
110
111        if previous_subscriber_count == 1 {
112            // We were the last instance of the subscriber; let the auto-shrinker know by
113            // notifying it of our room id.
114
115            let mut room_id = self.room_id.clone();
116
117            // Try to send without waiting for channel capacity, and restart in a spin-loop
118            // if it failed (until a maximum number of attempts is reached, or
119            // the send was successful). The channel shouldn't be super busy in
120            // general, so this should resolve quickly enough.
121
122            let mut num_attempts = 0;
123
124            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
125                num_attempts += 1;
126
127                if num_attempts > 1024 {
128                    // If we've tried too many times, just give up with a warning; after all, this
129                    // is only an optimization.
130                    warn!(
131                        "couldn't send notification to the auto-shrink channel \
132                         after 1024 attempts; giving up"
133                    );
134                    return;
135                }
136
137                match err {
138                    mpsc::error::TrySendError::Full(stolen_room_id) => {
139                        room_id = stolen_room_id;
140                    }
141                    mpsc::error::TrySendError::Closed(_) => return,
142                }
143            }
144
145            trace!("sent notification to the parent channel that we were the last subscriber");
146        }
147    }
148}
149
150impl Deref for RoomEventCacheSubscriber {
151    type Target = Receiver<RoomEventCacheUpdate>;
152
153    fn deref(&self) -> &Self::Target {
154        &self.recv
155    }
156}
157
158impl DerefMut for RoomEventCacheSubscriber {
159    fn deref_mut(&mut self) -> &mut Self::Target {
160        &mut self.recv
161    }
162}
163
164impl RoomEventCache {
165    /// Create a new [`RoomEventCache`] using the given room and store.
166    pub(super) fn new(
167        client: WeakClient,
168        state: RoomEventCacheStateLock,
169        pagination_status: SharedObservable<RoomPaginationStatus>,
170        room_id: OwnedRoomId,
171        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
172        update_sender: Sender<RoomEventCacheUpdate>,
173        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
174    ) -> Self {
175        Self {
176            inner: Arc::new(RoomEventCacheInner::new(
177                client,
178                state,
179                pagination_status,
180                room_id,
181                auto_shrink_sender,
182                update_sender,
183                generic_update_sender,
184            )),
185        }
186    }
187
188    /// Get the room ID for this [`RoomEventCache`].
189    pub fn room_id(&self) -> &RoomId {
190        &self.inner.room_id
191    }
192
193    /// Read all current events.
194    ///
195    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
196    /// subscriber.
197    pub async fn events(&self) -> Result<Vec<Event>> {
198        let state = self.inner.state.read().await?;
199
200        Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
201    }
202
203    /// Subscribe to this room updates, after getting the initial list of
204    /// events.
205    ///
206    /// Use [`RoomEventCache::events`] to get all current events without the
207    /// subscriber. Creating, and especially dropping, a
208    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
209    pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
210        let state = self.inner.state.read().await?;
211        let events =
212            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
213
214        let subscriber_count = state.subscriber_count();
215        let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
216        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
217
218        let recv = self.inner.update_sender.subscribe();
219        let subscriber = RoomEventCacheSubscriber {
220            recv,
221            room_id: self.inner.room_id.clone(),
222            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
223            subscriber_count: subscriber_count.clone(),
224        };
225
226        Ok((events, subscriber))
227    }
228
229    /// Subscribe to thread for a given root event, and get a (maybe empty)
230    /// initially known list of events for that thread.
231    pub async fn subscribe_to_thread(
232        &self,
233        thread_root: OwnedEventId,
234    ) -> Result<(Vec<Event>, Receiver<ThreadEventCacheUpdate>)> {
235        let mut state = self.inner.state.write().await?;
236        Ok(state.subscribe_to_thread(thread_root))
237    }
238
239    /// Subscribe to the pinned event cache for this room.
240    ///
241    /// This is a persisted view over the pinned events of a room.
242    ///
243    /// The pinned events will be initially reloaded from storage, and/or loaded
244    /// from a network request to fetch the latest pinned events and their
245    /// relations, to update it as needed. The list of pinned events will
246    /// also be kept up-to-date as new events are pinned, and new
247    /// related events show up from other sources.
248    pub async fn subscribe_to_pinned_events(
249        &self,
250    ) -> Result<(Vec<Event>, Receiver<RoomEventCacheUpdate>)> {
251        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
252        let mut state = self.inner.state.write().await?;
253
254        state.subscribe_to_pinned_events(room).await
255    }
256
257    /// Paginate backwards in a thread, given its root event ID.
258    ///
259    /// Returns whether we've hit the start of the thread, in which case the
260    /// root event will be prepended to the thread.
261    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
262    pub async fn paginate_thread_backwards(
263        &self,
264        thread_root: OwnedEventId,
265        num_events: u16,
266    ) -> Result<bool> {
267        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
268
269        // Take the lock only for a short time here.
270        let mut outcome =
271            self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone());
272
273        loop {
274            match outcome {
275                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
276                    // Start a threaded pagination from this gap.
277                    let options = RelationsOptions {
278                        from: prev_token.clone(),
279                        dir: Direction::Backward,
280                        limit: Some(num_events.into()),
281                        include_relations: IncludeRelations::AllRelations,
282                        recurse: true,
283                    };
284
285                    let mut result = room
286                        .relations(thread_root.clone(), options)
287                        .await
288                        .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
289
290                    let reached_start = result.next_batch_token.is_none();
291                    trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
292
293                    // Because the state lock is taken again in `load_or_fetch_event`, we need
294                    // to do this *before* we take the state lock again.
295                    let root_event =
296                        if reached_start {
297                            // Prepend the thread root event to the results.
298                            Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
299                                |err| EventCacheError::BackpaginationError(Box::new(err)),
300                            )?)
301                        } else {
302                            None
303                        };
304
305                    let mut state = self.inner.state.write().await?;
306
307                    // Save all the events (but the thread root) in the store.
308                    state.save_events(result.chunk.iter().cloned()).await?;
309
310                    // Note: the events are still in the reversed order at this point, so
311                    // pushing will eventually make it so that the root event is the first.
312                    result.chunk.extend(root_event);
313
314                    if let Some(outcome) = state.finish_thread_network_pagination(
315                        thread_root.clone(),
316                        prev_token,
317                        result.next_batch_token,
318                        result.chunk,
319                    ) {
320                        return Ok(outcome.reached_start);
321                    }
322
323                    // fallthrough: restart the pagination.
324                    outcome = state.load_more_thread_events_backwards(thread_root.clone());
325                }
326
327                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
328                    // We're done!
329                    return Ok(true);
330                }
331
332                LoadMoreEventsBackwardsOutcome::Events { .. } => {
333                    // TODO: implement :)
334                    unimplemented!("loading from disk for threads is not implemented yet");
335                }
336            }
337        }
338    }
339
340    /// Return a [`RoomPagination`] API object useful for running
341    /// back-pagination queries in the current room.
342    pub fn pagination(&self) -> RoomPagination {
343        RoomPagination { inner: self.inner.clone() }
344    }
345
346    /// Try to find a single event in this room, starting from the most recent
347    /// event.
348    ///
349    /// The `predicate` receives two arguments: the current event, and the
350    /// ID of the _previous_ (older) event.
351    ///
352    /// **Warning**! It looks into the loaded events from the in-memory linked
353    /// chunk **only**. It doesn't look inside the storage.
354    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
355    where
356        P: FnMut(&Event, Option<&Event>) -> Option<O>,
357    {
358        Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
359    }
360
361    /// Try to find an event by ID in this room.
362    ///
363    /// It starts by looking into loaded events before looking inside the
364    /// storage.
365    pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
366        Ok(self
367            .inner
368            .state
369            .read()
370            .await?
371            .find_event(event_id)
372            .await
373            .ok()
374            .flatten()
375            .map(|(_loc, event)| event))
376    }
377
378    /// Try to find an event by ID in this room, along with its related events.
379    ///
380    /// You can filter which types of related events to retrieve using
381    /// `filter`. `None` will retrieve related events of any type.
382    ///
383    /// The related events are sorted like this:
384    ///
385    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
386    ///   located at the beginning of the array.
387    /// - events present in the linked chunk (be it in memory or in the storage)
388    ///   will be sorted according to their ordering in the linked chunk.
389    pub async fn find_event_with_relations(
390        &self,
391        event_id: &EventId,
392        filter: Option<Vec<RelationType>>,
393    ) -> Result<Option<(Event, Vec<Event>)>> {
394        // Search in all loaded or stored events.
395        Ok(self
396            .inner
397            .state
398            .read()
399            .await?
400            .find_event_with_relations(event_id, filter.clone())
401            .await
402            .ok()
403            .flatten())
404    }
405
406    /// Try to find the related events for an event by ID in this room.
407    ///
408    /// You can filter which types of related events to retrieve using
409    /// `filter`. `None` will retrieve related events of any type.
410    ///
411    /// The related events are sorted like this:
412    ///
413    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
414    ///   located at the beginning of the array.
415    /// - events present in the linked chunk (be it in memory or in the storage)
416    ///   will be sorted according to their ordering in the linked chunk.
417    pub async fn find_event_relations(
418        &self,
419        event_id: &EventId,
420        filter: Option<Vec<RelationType>>,
421    ) -> Result<Vec<Event>> {
422        // Search in all loaded or stored events.
423        self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
424    }
425
426    /// Clear all the storage for this [`RoomEventCache`].
427    ///
428    /// This will get rid of all the events from the linked chunk and persisted
429    /// storage.
430    pub async fn clear(&self) -> Result<()> {
431        // Clear the linked chunk and persisted storage.
432        let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
433
434        // Notify observers about the update.
435        let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
436            diffs: updates_as_vector_diffs,
437            origin: EventsOrigin::Cache,
438        });
439
440        // Notify observers about the generic update.
441        let _ = self
442            .inner
443            .generic_update_sender
444            .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
445
446        Ok(())
447    }
448
449    /// Handle a single event from the `SendQueue`.
450    pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
451        self.inner
452            .handle_timeline(
453                Timeline { limited: false, prev_batch: None, events: vec![event] },
454                Vec::new(),
455                BTreeMap::new(),
456            )
457            .await
458    }
459
460    /// Save some events in the event cache, for further retrieval with
461    /// [`Self::event`].
462    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
463        match self.inner.state.write().await {
464            Ok(mut state_guard) => {
465                if let Err(err) = state_guard.save_events(events).await {
466                    warn!("couldn't save event in the event cache: {err}");
467                }
468            }
469
470            Err(err) => {
471                warn!("couldn't save event in the event cache: {err}");
472            }
473        }
474    }
475
476    /// Return a nice debug string (a vector of lines) for the linked chunk of
477    /// events for this room.
478    pub async fn debug_string(&self) -> Vec<String> {
479        match self.inner.state.read().await {
480            Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
481            Err(err) => {
482                warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
483
484                vec![]
485            }
486        }
487    }
488}
489
490/// The (non-cloneable) details of the `RoomEventCache`.
491pub(super) struct RoomEventCacheInner {
492    /// The room id for this room.
493    pub(super) room_id: OwnedRoomId,
494
495    pub weak_room: WeakRoom,
496
497    /// State for this room's event cache.
498    pub state: RoomEventCacheStateLock,
499
500    /// A notifier that we received a new pagination token.
501    pub pagination_batch_token_notifier: Notify,
502
503    pub pagination_status: SharedObservable<RoomPaginationStatus>,
504
505    /// Sender to the auto-shrink channel.
506    ///
507    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
508    /// more details.
509    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
510
511    /// Sender part for update subscribers to this room.
512    pub update_sender: Sender<RoomEventCacheUpdate>,
513
514    /// A clone of [`EventCacheInner::generic_update_sender`].
515    ///
516    /// Whilst `EventCacheInner` handles the generic updates from the sync, or
517    /// the storage, it doesn't handle the update from pagination. Having a
518    /// clone here allows to access it from [`RoomPagination`].
519    pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
520}
521
522impl RoomEventCacheInner {
523    /// Creates a new cache for a room, and subscribes to room updates, so as
524    /// to handle new timeline events.
525    fn new(
526        client: WeakClient,
527        state: RoomEventCacheStateLock,
528        pagination_status: SharedObservable<RoomPaginationStatus>,
529        room_id: OwnedRoomId,
530        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
531        update_sender: Sender<RoomEventCacheUpdate>,
532        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
533    ) -> Self {
534        let weak_room = WeakRoom::new(client, room_id);
535
536        Self {
537            room_id: weak_room.room_id().to_owned(),
538            weak_room,
539            state,
540            update_sender,
541            pagination_batch_token_notifier: Default::default(),
542            auto_shrink_sender,
543            pagination_status,
544            generic_update_sender,
545        }
546    }
547
548    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
549        if account_data.is_empty() {
550            return;
551        }
552
553        let mut handled_read_marker = false;
554
555        trace!("Handling account data");
556
557        for raw_event in account_data {
558            match raw_event.deserialize() {
559                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
560                    // If duplicated, do not forward read marker multiple times
561                    // to avoid clutter the update channel.
562                    if handled_read_marker {
563                        continue;
564                    }
565
566                    handled_read_marker = true;
567
568                    // Propagate to observers. (We ignore the error if there aren't any.)
569                    let _ = self.update_sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
570                        event_id: ev.content.event_id,
571                    });
572                }
573
574                Ok(_) => {
575                    // We're not interested in other room account data updates,
576                    // at this point.
577                }
578
579                Err(e) => {
580                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
581                    warn!(event_type, "Failed to deserialize account data: {e}");
582                }
583            }
584        }
585    }
586
587    #[instrument(skip_all, fields(room_id = %self.room_id))]
588    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
589        self.handle_timeline(
590            updates.timeline,
591            updates.ephemeral.clone(),
592            updates.ambiguity_changes,
593        )
594        .await?;
595        self.handle_account_data(updates.account_data);
596
597        Ok(())
598    }
599
600    #[instrument(skip_all, fields(room_id = %self.room_id))]
601    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
602        self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
603
604        Ok(())
605    }
606
607    /// Handle a [`Timeline`], i.e. new events received by a sync for this
608    /// room.
609    async fn handle_timeline(
610        &self,
611        timeline: Timeline,
612        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
613        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
614    ) -> Result<()> {
615        if timeline.events.is_empty()
616            && timeline.prev_batch.is_none()
617            && ephemeral_events.is_empty()
618            && ambiguity_changes.is_empty()
619        {
620            return Ok(());
621        }
622
623        // Add all the events to the backend.
624        trace!("adding new events");
625
626        let (stored_prev_batch_token, timeline_event_diffs) =
627            self.state.write().await?.handle_sync(timeline).await?;
628
629        // Now that all events have been added, we can trigger the
630        // `pagination_token_notifier`.
631        if stored_prev_batch_token {
632            self.pagination_batch_token_notifier.notify_one();
633        }
634
635        // The order matters here: first send the timeline event diffs, then only the
636        // related events (read receipts, etc.).
637        if !timeline_event_diffs.is_empty() {
638            let _ = self.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
639                diffs: timeline_event_diffs,
640                origin: EventsOrigin::Sync,
641            });
642
643            let _ = self
644                .generic_update_sender
645                .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
646        }
647
648        if !ephemeral_events.is_empty() {
649            let _ = self
650                .update_sender
651                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
652        }
653
654        if !ambiguity_changes.is_empty() {
655            let _ =
656                self.update_sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
657        }
658
659        Ok(())
660    }
661}
662
663/// Internal type to represent the output of
664/// [`RoomEventCacheState::load_more_events_backwards`].
665#[derive(Debug)]
666pub(super) enum LoadMoreEventsBackwardsOutcome {
667    /// A gap has been inserted.
668    Gap {
669        /// The previous batch token to be used as the "end" parameter in the
670        /// back-pagination request.
671        prev_token: Option<String>,
672    },
673
674    /// The start of the timeline has been reached.
675    StartOfTimeline,
676
677    /// Events have been inserted.
678    Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
679}
680
681// Use a private module to hide `events` to this parent module.
682mod private {
683    use std::{
684        collections::{BTreeMap, HashMap, HashSet},
685        sync::{
686            Arc, OnceLock,
687            atomic::{AtomicBool, AtomicUsize, Ordering},
688        },
689    };
690
691    use eyeball::SharedObservable;
692    use eyeball_im::VectorDiff;
693    use itertools::Itertools;
694    use matrix_sdk_base::{
695        apply_redaction,
696        deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
697        event_cache::{
698            Event, Gap,
699            store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
700        },
701        linked_chunk::{
702            ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
703            OwnedLinkedChunkId, Position, Update, lazy_loader,
704        },
705        serde_helpers::{extract_edit_target, extract_thread_root},
706        sync::Timeline,
707    };
708    use matrix_sdk_common::executor::spawn;
709    use ruma::{
710        EventId, OwnedEventId, OwnedRoomId, RoomId,
711        events::{
712            AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
713            relation::RelationType, room::redaction::SyncRoomRedactionEvent,
714        },
715        room_version_rules::RoomVersionRules,
716        serde::Raw,
717    };
718    use tokio::sync::{
719        Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
720        broadcast::{Receiver, Sender},
721    };
722    use tracing::{debug, error, instrument, trace, warn};
723
724    use super::{
725        super::{
726            BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate,
727            RoomPaginationStatus, ThreadEventCacheUpdate,
728            deduplicator::{DeduplicationOutcome, filter_duplicate_events},
729            room::threads::ThreadEventCache,
730        },
731        EventLocation, EventsOrigin, LoadMoreEventsBackwardsOutcome, RoomEventCacheGenericUpdate,
732        RoomEventCacheUpdate,
733        events::EventLinkedChunk,
734        sort_positions_descending,
735    };
736    use crate::{Room, event_cache::room::pinned_events::PinnedEventCache};
737
738    /// State for a single room's event cache.
739    ///
740    /// This contains all the inner mutable states that ought to be updated at
741    /// the same time.
742    pub struct RoomEventCacheStateLock {
743        /// The per-thread lock around the real state.
744        locked_state: RwLock<RoomEventCacheState>,
745
746        /// A lock taken to avoid multiple attempts to upgrade from a read lock
747        /// to a write lock.
748        ///
749        /// Please see inline comment of [`Self::read`] to understand why it
750        /// exists.
751        state_lock_upgrade_mutex: Mutex<()>,
752    }
753
754    struct RoomEventCacheState {
755        /// Whether thread support has been enabled for the event cache.
756        enabled_thread_support: bool,
757
758        /// The room this state relates to.
759        room_id: OwnedRoomId,
760
761        /// Reference to the underlying backing store.
762        store: EventCacheStoreLock,
763
764        /// The loaded events for the current room, that is, the in-memory
765        /// linked chunk for this room.
766        room_linked_chunk: EventLinkedChunk,
767
768        /// Threads present in this room.
769        ///
770        /// Keyed by the thread root event ID.
771        threads: HashMap<OwnedEventId, ThreadEventCache>,
772
773        /// Cache for pinned events in this room, initialized on-demand.
774        pinned_event_cache: OnceLock<PinnedEventCache>,
775
776        pagination_status: SharedObservable<RoomPaginationStatus>,
777
778        /// A clone of [`super::RoomEventCacheInner::update_sender`].
779        ///
780        /// This is used only by the [`RoomEventCacheStateLock::read`] and
781        /// [`RoomEventCacheStateLock::write`] when the state must be reset.
782        update_sender: Sender<RoomEventCacheUpdate>,
783
784        /// A clone of [`super::super::EventCacheInner::generic_update_sender`].
785        ///
786        /// This is used only by the [`RoomEventCacheStateLock::read`] and
787        /// [`RoomEventCacheStateLock::write`] when the state must be reset.
788        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
789
790        /// A clone of
791        /// [`super::super::EventCacheInner::linked_chunk_update_sender`].
792        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
793
794        /// The rules for the version of this room.
795        room_version_rules: RoomVersionRules,
796
797        /// Have we ever waited for a previous-batch-token to come from sync, in
798        /// the context of pagination? We do this at most once per room,
799        /// the first time we try to run backward pagination. We reset
800        /// that upon clearing the timeline events.
801        waited_for_initial_prev_token: Arc<AtomicBool>,
802
803        /// An atomic count of the current number of subscriber of the
804        /// [`super::RoomEventCache`].
805        subscriber_count: Arc<AtomicUsize>,
806    }
807
808    impl RoomEventCacheStateLock {
809        /// Create a new state, or reload it from storage if it's been enabled.
810        ///
811        /// Not all events are going to be loaded. Only a portion of them. The
812        /// [`EventLinkedChunk`] relies on a [`LinkedChunk`] to store all
813        /// events. Only the last chunk will be loaded. It means the
814        /// events are loaded from the most recent to the oldest. To
815        /// load more events, see [`RoomPagination`].
816        ///
817        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
818        /// [`RoomPagination`]: super::RoomPagination
819        #[allow(clippy::too_many_arguments)]
820        pub async fn new(
821            room_id: OwnedRoomId,
822            room_version_rules: RoomVersionRules,
823            enabled_thread_support: bool,
824            update_sender: Sender<RoomEventCacheUpdate>,
825            generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
826            linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
827            store: EventCacheStoreLock,
828            pagination_status: SharedObservable<RoomPaginationStatus>,
829        ) -> Result<Self, EventCacheError> {
830            let store_guard = match store.lock().await? {
831                // Lock is clean: all good!
832                EventCacheStoreLockState::Clean(guard) => guard,
833
834                // Lock is dirty, not a problem, it's the first time we are creating this state, no
835                // need to refresh.
836                EventCacheStoreLockState::Dirty(guard) => {
837                    EventCacheStoreLockGuard::clear_dirty(&guard);
838
839                    guard
840                }
841            };
842
843            let linked_chunk_id = LinkedChunkId::Room(&room_id);
844
845            // Load the full linked chunk's metadata, so as to feed the order tracker.
846            //
847            // If loading the full linked chunk failed, we'll clear the event cache, as it
848            // indicates that at some point, there's some malformed data.
849            let full_linked_chunk_metadata =
850                match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
851                    Ok(metas) => metas,
852                    Err(err) => {
853                        error!(
854                            "error when loading a linked chunk's metadata from the store: {err}"
855                        );
856
857                        // Try to clear storage for this room.
858                        store_guard
859                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
860                            .await?;
861
862                        // Restart with an empty linked chunk.
863                        None
864                    }
865                };
866
867            let linked_chunk = match store_guard
868                .load_last_chunk(linked_chunk_id)
869                .await
870                .map_err(EventCacheError::from)
871                .and_then(|(last_chunk, chunk_identifier_generator)| {
872                    lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
873                        .map_err(EventCacheError::from)
874                }) {
875                Ok(linked_chunk) => linked_chunk,
876                Err(err) => {
877                    error!(
878                        "error when loading a linked chunk's latest chunk from the store: {err}"
879                    );
880
881                    // Try to clear storage for this room.
882                    store_guard
883                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
884                        .await?;
885
886                    None
887                }
888            };
889
890            let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false));
891
892            Ok(Self {
893                locked_state: RwLock::new(RoomEventCacheState {
894                    enabled_thread_support,
895                    room_id,
896                    store,
897                    room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
898                        linked_chunk,
899                        full_linked_chunk_metadata,
900                    ),
901                    // The threads mapping is intentionally empty at start, since we're going to
902                    // reload threads lazily, as soon as we need to (based on external
903                    // subscribers) or when we get new information about those (from
904                    // sync).
905                    threads: HashMap::new(),
906                    pagination_status,
907                    update_sender,
908                    generic_update_sender,
909                    linked_chunk_update_sender,
910                    room_version_rules,
911                    waited_for_initial_prev_token,
912                    subscriber_count: Default::default(),
913                    pinned_event_cache: OnceLock::new(),
914                }),
915                state_lock_upgrade_mutex: Mutex::new(()),
916            })
917        }
918
919        /// Lock this [`RoomEventCacheStateLock`] with per-thread shared access.
920        ///
921        /// This method locks the per-thread lock over the state, and then locks
922        /// the cross-process lock over the store. It returns an RAII guard
923        /// which will drop the read access to the state and to the store when
924        /// dropped.
925        ///
926        /// If the cross-process lock over the store is dirty (see
927        /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
928        pub async fn read(&self) -> Result<RoomEventCacheStateLockReadGuard<'_>, EventCacheError> {
929            // Only one call at a time to `read` is allowed.
930            //
931            // Why? Because in case the cross-process lock over the store is dirty, we need
932            // to upgrade the read lock over the state to a write lock.
933            //
934            // ## Upgradable read lock
935            //
936            // One may argue that this upgrades can be done with an _upgradable read lock_
937            // [^1] [^2]. We don't want to use this solution: an upgradable read lock is
938            // basically a mutex because we are losing the shared access property, i.e.
939            // having multiple read locks at the same time. This is an important property to
940            // hold for performance concerns.
941            //
942            // ## Downgradable write lock
943            //
944            // One may also argue we could first obtain a write lock over the state from the
945            // beginning, thus removing the need to upgrade the read lock to a write lock.
946            // The write lock is then downgraded to a read lock once the dirty is cleaned
947            // up. It can potentially create a deadlock in the following situation:
948            //
949            // - `read` is called once, it takes a write lock, then downgrades it to a read
950            //   lock: the guard is kept alive somewhere,
951            // - `read` is called again, and waits to obtain the write lock, which is
952            //   impossible as long as the guard from the previous call is not dropped.
953            //
954            // ## “Atomic” read and write
955            //
956            // One may finally argue to first obtain a read lock over the state, then drop
957            // it if the cross-process lock over the store is dirty, and immediately obtain
958            // a write lock (which can later be downgraded to a read lock). The problem is
959            // that this write lock is async: anything can happen between the drop and the
960            // new lock acquisition, and it's not possible to pause the runtime in the
961            // meantime.
962            //
963            // ## Semaphore with 1 permit, aka a Mutex
964            //
965            // The chosen idea is to allow only one execution at a time of this method: it
966            // becomes a critical section. That way we are free to “upgrade” the read lock
967            // by dropping it and obtaining a new write lock. All callers to this method are
968            // waiting, so nothing can happen in the meantime.
969            //
970            // Note that it doesn't conflict with the `write` method because this later
971            // immediately obtains a write lock, which avoids any conflict with this method.
972            //
973            // [^1]: https://docs.rs/lock_api/0.4.14/lock_api/struct.RwLock.html#method.upgradable_read
974            // [^2]: https://docs.rs/async-lock/3.4.1/async_lock/struct.RwLock.html#method.upgradable_read
975            let _state_lock_upgrade_guard = self.state_lock_upgrade_mutex.lock().await;
976
977            // Obtain a read lock.
978            let state_guard = self.locked_state.read().await;
979
980            match state_guard.store.lock().await? {
981                EventCacheStoreLockState::Clean(store_guard) => {
982                    Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard })
983                }
984                EventCacheStoreLockState::Dirty(store_guard) => {
985                    // Drop the read lock, and take a write lock to modify the state.
986                    // This is safe because only one reader at a time (see
987                    // `Self::state_lock_upgrade_mutex`) is allowed.
988                    drop(state_guard);
989                    let state_guard = self.locked_state.write().await;
990
991                    let mut guard = RoomEventCacheStateLockWriteGuard {
992                        state: state_guard,
993                        store: store_guard,
994                    };
995
996                    // Force to reload by shrinking to the last chunk.
997                    let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
998
999                    // All good now, mark the cross-process lock as non-dirty.
1000                    EventCacheStoreLockGuard::clear_dirty(&guard.store);
1001
1002                    // Downgrade the guard as soon as possible.
1003                    let guard = guard.downgrade();
1004
1005                    // Now let the world know about the reload.
1006                    if !updates_as_vector_diffs.is_empty() {
1007                        // Notify observers about the update.
1008                        let _ = guard.state.update_sender.send(
1009                            RoomEventCacheUpdate::UpdateTimelineEvents {
1010                                diffs: updates_as_vector_diffs,
1011                                origin: EventsOrigin::Cache,
1012                            },
1013                        );
1014
1015                        // Notify observers about the generic update.
1016                        let _ =
1017                            guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1018                                room_id: guard.state.room_id.clone(),
1019                            });
1020                    }
1021
1022                    Ok(guard)
1023                }
1024            }
1025        }
1026
1027        /// Lock this [`RoomEventCacheStateLock`] with exclusive per-thread
1028        /// write access.
1029        ///
1030        /// This method locks the per-thread lock over the state, and then locks
1031        /// the cross-process lock over the store. It returns an RAII guard
1032        /// which will drop the write access to the state and to the store when
1033        /// dropped.
1034        ///
1035        /// If the cross-process lock over the store is dirty (see
1036        /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
1037        pub async fn write(
1038            &self,
1039        ) -> Result<RoomEventCacheStateLockWriteGuard<'_>, EventCacheError> {
1040            let state_guard = self.locked_state.write().await;
1041
1042            match state_guard.store.lock().await? {
1043                EventCacheStoreLockState::Clean(store_guard) => {
1044                    Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard })
1045                }
1046                EventCacheStoreLockState::Dirty(store_guard) => {
1047                    let mut guard = RoomEventCacheStateLockWriteGuard {
1048                        state: state_guard,
1049                        store: store_guard,
1050                    };
1051
1052                    // Force to reload by shrinking to the last chunk.
1053                    let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
1054
1055                    // All good now, mark the cross-process lock as non-dirty.
1056                    EventCacheStoreLockGuard::clear_dirty(&guard.store);
1057
1058                    // Now let the world know about the reload.
1059                    if !updates_as_vector_diffs.is_empty() {
1060                        // Notify observers about the update.
1061                        let _ = guard.state.update_sender.send(
1062                            RoomEventCacheUpdate::UpdateTimelineEvents {
1063                                diffs: updates_as_vector_diffs,
1064                                origin: EventsOrigin::Cache,
1065                            },
1066                        );
1067
1068                        // Notify observers about the generic update.
1069                        let _ =
1070                            guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1071                                room_id: guard.state.room_id.clone(),
1072                            });
1073                    }
1074
1075                    Ok(guard)
1076                }
1077            }
1078        }
1079    }
1080
1081    /// The read lock guard returned by [`RoomEventCacheStateLock::read`].
1082    pub struct RoomEventCacheStateLockReadGuard<'a> {
1083        /// The per-thread read lock guard over the [`RoomEventCacheState`].
1084        state: RwLockReadGuard<'a, RoomEventCacheState>,
1085
1086        /// The cross-process lock guard over the store.
1087        store: EventCacheStoreLockGuard,
1088    }
1089
1090    /// The write lock guard return by [`RoomEventCacheStateLock::write`].
1091    pub struct RoomEventCacheStateLockWriteGuard<'a> {
1092        /// The per-thread write lock guard over the [`RoomEventCacheState`].
1093        state: RwLockWriteGuard<'a, RoomEventCacheState>,
1094
1095        /// The cross-process lock guard over the store.
1096        store: EventCacheStoreLockGuard,
1097    }
1098
1099    impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1100        /// Synchronously downgrades a write lock into a read lock.
1101        ///
1102        /// The per-thread/state lock is downgraded atomically, without allowing
1103        /// any writers to take exclusive access of the lock in the meantime.
1104        ///
1105        /// It returns an RAII guard which will drop the write access to the
1106        /// state and to the store when dropped.
1107        fn downgrade(self) -> RoomEventCacheStateLockReadGuard<'a> {
1108            RoomEventCacheStateLockReadGuard { state: self.state.downgrade(), store: self.store }
1109        }
1110    }
1111
1112    impl<'a> RoomEventCacheStateLockReadGuard<'a> {
1113        /// Returns a read-only reference to the underlying room linked chunk.
1114        pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1115            &self.state.room_linked_chunk
1116        }
1117
1118        pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
1119            &self.state.subscriber_count
1120        }
1121
1122        /// Find a single event in this room.
1123        ///
1124        /// It starts by looking into loaded events in `EventLinkedChunk` before
1125        /// looking inside the storage.
1126        pub async fn find_event(
1127            &self,
1128            event_id: &EventId,
1129        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1130            find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1131                .await
1132        }
1133
1134        /// Find an event and all its relations in the persisted storage.
1135        ///
1136        /// This goes straight to the database, as a simplification; we don't
1137        /// expect to need to have to look up in memory events, or that
1138        /// all the related events are actually loaded.
1139        ///
1140        /// The related events are sorted like this:
1141        /// - events saved out-of-band with
1142        ///   [`super::RoomEventCache::save_events`] will be located at the
1143        ///   beginning of the array.
1144        /// - events present in the linked chunk (be it in memory or in the
1145        ///   database) will be sorted according to their ordering in the linked
1146        ///   chunk.
1147        pub async fn find_event_with_relations(
1148            &self,
1149            event_id: &EventId,
1150            filters: Option<Vec<RelationType>>,
1151        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1152            find_event_with_relations(
1153                event_id,
1154                &self.state.room_id,
1155                filters,
1156                &self.state.room_linked_chunk,
1157                &self.store,
1158            )
1159            .await
1160        }
1161
1162        /// Find all relations for an event in the persisted storage.
1163        ///
1164        /// This goes straight to the database, as a simplification; we don't
1165        /// expect to need to have to look up in memory events, or that
1166        /// all the related events are actually loaded.
1167        ///
1168        /// The related events are sorted like this:
1169        /// - events saved out-of-band with
1170        ///   [`super::RoomEventCache::save_events`] will be located at the
1171        ///   beginning of the array.
1172        /// - events present in the linked chunk (be it in memory or in the
1173        ///   database) will be sorted according to their ordering in the linked
1174        ///   chunk.
1175        pub async fn find_event_relations(
1176            &self,
1177            event_id: &EventId,
1178            filters: Option<Vec<RelationType>>,
1179        ) -> Result<Vec<Event>, EventCacheError> {
1180            find_event_relations(
1181                event_id,
1182                &self.state.room_id,
1183                filters,
1184                &self.state.room_linked_chunk,
1185                &self.store,
1186            )
1187            .await
1188        }
1189
1190        //// Find a single event in this room, starting from the most recent event.
1191        ///
1192        /// The `predicate` receives two arguments: the current event, and the
1193        /// ID of the _previous_ (older) event.
1194        ///
1195        /// **Warning**! It looks into the loaded events from the in-memory
1196        /// linked chunk **only**. It doesn't look inside the storage,
1197        /// contrary to [`Self::find_event`].
1198        pub fn rfind_map_event_in_memory_by<'i, O, P>(&'i self, mut predicate: P) -> Option<O>
1199        where
1200            P: FnMut(&'i Event, Option<&'i Event>) -> Option<O>,
1201        {
1202            self.state
1203                .room_linked_chunk
1204                .revents()
1205                .peekable()
1206                .batching(|iter| {
1207                    iter.next().map(|(_position, event)| {
1208                        (event, iter.peek().map(|(_next_position, next_event)| *next_event))
1209                    })
1210                })
1211                .find_map(|(event, next_event)| predicate(event, next_event))
1212        }
1213
1214        #[cfg(test)]
1215        pub fn is_dirty(&self) -> bool {
1216            EventCacheStoreLockGuard::is_dirty(&self.store)
1217        }
1218    }
1219
1220    impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1221        /// Returns a write reference to the underlying room linked chunk.
1222        #[cfg(any(feature = "e2e-encryption", test))]
1223        pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk {
1224            &mut self.state.room_linked_chunk
1225        }
1226
1227        /// Get a reference to the [`pinned_event_cache`] if it has been
1228        /// initialized.
1229        #[cfg(any(feature = "e2e-encryption", test))]
1230        pub fn pinned_event_cache(&self) -> Option<&PinnedEventCache> {
1231            self.state.pinned_event_cache.get()
1232        }
1233
1234        /// Get a reference to the `waited_for_initial_prev_token` atomic bool.
1235        pub fn waited_for_initial_prev_token(&self) -> &Arc<AtomicBool> {
1236            &self.state.waited_for_initial_prev_token
1237        }
1238
1239        /// Find a single event in this room.
1240        ///
1241        /// It starts by looking into loaded events in `EventLinkedChunk` before
1242        /// looking inside the storage.
1243        pub async fn find_event(
1244            &self,
1245            event_id: &EventId,
1246        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1247            find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1248                .await
1249        }
1250
1251        /// Find an event and all its relations in the persisted storage.
1252        ///
1253        /// This goes straight to the database, as a simplification; we don't
1254        /// expect to need to have to look up in memory events, or that
1255        /// all the related events are actually loaded.
1256        ///
1257        /// The related events are sorted like this:
1258        /// - events saved out-of-band with
1259        ///   [`super::RoomEventCache::save_events`] will be located at the
1260        ///   beginning of the array.
1261        /// - events present in the linked chunk (be it in memory or in the
1262        ///   database) will be sorted according to their ordering in the linked
1263        ///   chunk.
1264        pub async fn find_event_with_relations(
1265            &self,
1266            event_id: &EventId,
1267            filters: Option<Vec<RelationType>>,
1268        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1269            find_event_with_relations(
1270                event_id,
1271                &self.state.room_id,
1272                filters,
1273                &self.state.room_linked_chunk,
1274                &self.store,
1275            )
1276            .await
1277        }
1278
1279        /// Load more events backwards if the last chunk is **not** a gap.
1280        pub async fn load_more_events_backwards(
1281            &mut self,
1282        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
1283            // If any in-memory chunk is a gap, don't load more events, and let the caller
1284            // resolve the gap.
1285            if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token)
1286            {
1287                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
1288            }
1289
1290            let prev_first_chunk = self
1291                .state
1292                .room_linked_chunk
1293                .chunks()
1294                .next()
1295                .expect("a linked chunk is never empty");
1296
1297            // The first chunk is not a gap, we can load its previous chunk.
1298            let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1299            let new_first_chunk = match self
1300                .store
1301                .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
1302                .await
1303            {
1304                Ok(Some(new_first_chunk)) => {
1305                    // All good, let's continue with this chunk.
1306                    new_first_chunk
1307                }
1308
1309                Ok(None) => {
1310                    // If we never received events for this room, this means we've never received a
1311                    // sync for that room, because every room must have *at least* a room creation
1312                    // event. Otherwise, we have reached the start of the timeline.
1313
1314                    if self.state.room_linked_chunk.events().next().is_some() {
1315                        // If there's at least one event, this means we've reached the start of the
1316                        // timeline, since the chunk is fully loaded.
1317                        trace!("chunk is fully loaded and non-empty: reached_start=true");
1318                        return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
1319                    }
1320
1321                    // Otherwise, start back-pagination from the end of the room.
1322                    return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
1323                }
1324
1325                Err(err) => {
1326                    error!("error when loading the previous chunk of a linked chunk: {err}");
1327
1328                    // Clear storage for this room.
1329                    self.store
1330                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1331                        .await?;
1332
1333                    // Return the error.
1334                    return Err(err.into());
1335                }
1336            };
1337
1338            let chunk_content = new_first_chunk.content.clone();
1339
1340            // We've reached the start on disk, if and only if, there was no chunk prior to
1341            // the one we just loaded.
1342            //
1343            // This value is correct, if and only if, it is used for a chunk content of kind
1344            // `Items`.
1345            let reached_start = new_first_chunk.previous.is_none();
1346
1347            if let Err(err) =
1348                self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk)
1349            {
1350                error!("error when inserting the previous chunk into its linked chunk: {err}");
1351
1352                // Clear storage for this room.
1353                self.store
1354                    .handle_linked_chunk_updates(
1355                        LinkedChunkId::Room(&self.state.room_id),
1356                        vec![Update::Clear],
1357                    )
1358                    .await?;
1359
1360                // Return the error.
1361                return Err(err.into());
1362            }
1363
1364            // ⚠️ Let's not propagate the updates to the store! We already have these data
1365            // in the store! Let's drain them.
1366            let _ = self.state.room_linked_chunk.store_updates().take();
1367
1368            // However, we want to get updates as `VectorDiff`s.
1369            let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1370
1371            Ok(match chunk_content {
1372                ChunkContent::Gap(gap) => {
1373                    trace!("reloaded chunk from disk (gap)");
1374                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1375                }
1376
1377                ChunkContent::Items(events) => {
1378                    trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1379                    LoadMoreEventsBackwardsOutcome::Events {
1380                        events,
1381                        timeline_event_diffs,
1382                        reached_start,
1383                    }
1384                }
1385            })
1386        }
1387
1388        /// If storage is enabled, unload all the chunks, then reloads only the
1389        /// last one.
1390        ///
1391        /// If storage's enabled, return a diff update that starts with a clear
1392        /// of all events; as a result, the caller may override any
1393        /// pending diff updates with the result of this function.
1394        ///
1395        /// Otherwise, returns `None`.
1396        pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1397            // Attempt to load the last chunk.
1398            let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1399            let (last_chunk, chunk_identifier_generator) =
1400                match self.store.load_last_chunk(linked_chunk_id).await {
1401                    Ok(pair) => pair,
1402
1403                    Err(err) => {
1404                        // If loading the last chunk failed, clear the entire linked chunk.
1405                        error!("error when reloading a linked chunk from memory: {err}");
1406
1407                        // Clear storage for this room.
1408                        self.store
1409                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1410                            .await?;
1411
1412                        // Restart with an empty linked chunk.
1413                        (None, ChunkIdentifierGenerator::new_from_scratch())
1414                    }
1415                };
1416
1417            debug!("unloading the linked chunk, and resetting it to its last chunk");
1418
1419            // Remove all the chunks from the linked chunks, except for the last one, and
1420            // updates the chunk identifier generator.
1421            if let Err(err) =
1422                self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1423            {
1424                error!("error when replacing the linked chunk: {err}");
1425                return self.reset_internal().await;
1426            }
1427
1428            // Let pagination observers know that we may have not reached the start of the
1429            // timeline.
1430            // TODO: likely need to cancel any ongoing pagination.
1431            self.state
1432                .pagination_status
1433                .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1434
1435            // Don't propagate those updates to the store; this is only for the in-memory
1436            // representation that we're doing this. Let's drain those store updates.
1437            let _ = self.state.room_linked_chunk.store_updates().take();
1438
1439            Ok(())
1440        }
1441
1442        /// Automatically shrink the room if there are no more subscribers, as
1443        /// indicated by the atomic number of active subscribers.
1444        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1445        pub async fn auto_shrink_if_no_subscribers(
1446            &mut self,
1447        ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1448            let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
1449
1450            trace!(subscriber_count, "received request to auto-shrink");
1451
1452            if subscriber_count == 0 {
1453                // If we are the last strong reference to the auto-shrinker, we can shrink the
1454                // events data structure to its last chunk.
1455                self.shrink_to_last_chunk().await?;
1456
1457                Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
1458            } else {
1459                Ok(None)
1460            }
1461        }
1462
1463        /// Force to shrink the room, whenever there is subscribers or not.
1464        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1465        pub async fn force_shrink_to_last_chunk(
1466            &mut self,
1467        ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1468            self.shrink_to_last_chunk().await?;
1469
1470            Ok(self.state.room_linked_chunk.updates_as_vector_diffs())
1471        }
1472
1473        /// Remove events by their position, in `EventLinkedChunk` and in
1474        /// `EventCacheStore`.
1475        ///
1476        /// This method is purposely isolated because it must ensure that
1477        /// positions are sorted appropriately or it can be disastrous.
1478        #[instrument(skip_all)]
1479        pub async fn remove_events(
1480            &mut self,
1481            in_memory_events: Vec<(OwnedEventId, Position)>,
1482            in_store_events: Vec<(OwnedEventId, Position)>,
1483        ) -> Result<(), EventCacheError> {
1484            // In-store events.
1485            if !in_store_events.is_empty() {
1486                let mut positions = in_store_events
1487                    .into_iter()
1488                    .map(|(_event_id, position)| position)
1489                    .collect::<Vec<_>>();
1490
1491                sort_positions_descending(&mut positions);
1492
1493                let updates = positions
1494                    .into_iter()
1495                    .map(|pos| Update::RemoveItem { at: pos })
1496                    .collect::<Vec<_>>();
1497
1498                self.apply_store_only_updates(updates).await?;
1499            }
1500
1501            // In-memory events.
1502            if in_memory_events.is_empty() {
1503                // Nothing else to do, return early.
1504                return Ok(());
1505            }
1506
1507            // `remove_events_by_position` is responsible of sorting positions.
1508            self.state
1509                .room_linked_chunk
1510                .remove_events_by_position(
1511                    in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1512                )
1513                .expect("failed to remove an event");
1514
1515            self.propagate_changes().await
1516        }
1517
1518        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1519            let updates = self.state.room_linked_chunk.store_updates().take();
1520            self.send_updates_to_store(updates).await
1521        }
1522
1523        /// Apply some updates that are effective only on the store itself.
1524        ///
1525        /// This method should be used only for updates that happen *outside*
1526        /// the in-memory linked chunk. Such updates must be applied
1527        /// onto the ordering tracker as well as to the persistent
1528        /// storage.
1529        async fn apply_store_only_updates(
1530            &mut self,
1531            updates: Vec<Update<Event, Gap>>,
1532        ) -> Result<(), EventCacheError> {
1533            self.state.room_linked_chunk.order_tracker.map_updates(&updates);
1534            self.send_updates_to_store(updates).await
1535        }
1536
1537        async fn send_updates_to_store(
1538            &mut self,
1539            mut updates: Vec<Update<Event, Gap>>,
1540        ) -> Result<(), EventCacheError> {
1541            if updates.is_empty() {
1542                return Ok(());
1543            }
1544
1545            // Strip relations from updates which insert or replace items.
1546            for update in updates.iter_mut() {
1547                match update {
1548                    Update::PushItems { items, .. } => strip_relations_from_events(items),
1549                    Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
1550                    // Other update kinds don't involve adding new events.
1551                    Update::NewItemsChunk { .. }
1552                    | Update::NewGapChunk { .. }
1553                    | Update::RemoveChunk(_)
1554                    | Update::RemoveItem { .. }
1555                    | Update::DetachLastItems { .. }
1556                    | Update::StartReattachItems
1557                    | Update::EndReattachItems
1558                    | Update::Clear => {}
1559                }
1560            }
1561
1562            // Spawn a task to make sure that all the changes are effectively forwarded to
1563            // the store, even if the call to this method gets aborted.
1564            //
1565            // The store cross-process locking involves an actual mutex, which ensures that
1566            // storing updates happens in the expected order.
1567
1568            let store = self.store.clone();
1569            let room_id = self.state.room_id.clone();
1570            let cloned_updates = updates.clone();
1571
1572            spawn(async move {
1573                trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1574                let linked_chunk_id = LinkedChunkId::Room(&room_id);
1575                store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1576                trace!("linked chunk updates applied");
1577
1578                super::Result::Ok(())
1579            })
1580            .await
1581            .expect("joining failed")?;
1582
1583            // Forward that the store got updated to observers.
1584            let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1585                linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()),
1586                updates,
1587            });
1588
1589            Ok(())
1590        }
1591
1592        /// Reset this data structure as if it were brand new.
1593        ///
1594        /// Return a single diff update that is a clear of all events; as a
1595        /// result, the caller may override any pending diff updates
1596        /// with the result of this function.
1597        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1598            self.reset_internal().await?;
1599
1600            let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
1601
1602            // Ensure the contract defined in the doc comment is true:
1603            debug_assert_eq!(diff_updates.len(), 1);
1604            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1605
1606            Ok(diff_updates)
1607        }
1608
1609        async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1610            self.state.room_linked_chunk.reset();
1611
1612            // No need to update the thread summaries: the room events are
1613            // gone because of the reset of `room_linked_chunk`.
1614            //
1615            // Clear the threads.
1616            for thread in self.state.threads.values_mut() {
1617                thread.clear();
1618            }
1619
1620            self.propagate_changes().await?;
1621
1622            // Reset the pagination state too: pretend we never waited for the initial
1623            // prev-batch token, and indicate that we're not at the start of the
1624            // timeline, since we don't know about that anymore.
1625            self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst);
1626            // TODO: likely must cancel any ongoing back-paginations too
1627            self.state
1628                .pagination_status
1629                .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1630
1631            Ok(())
1632        }
1633
1634        /// Handle the result of a sync.
1635        ///
1636        /// It may send room event cache updates to the given sender, if it
1637        /// generated any of those.
1638        ///
1639        /// Returns `true` for the first part of the tuple if a new gap
1640        /// (previous-batch token) has been inserted, `false` otherwise.
1641        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1642        pub async fn handle_sync(
1643            &mut self,
1644            mut timeline: Timeline,
1645        ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1646            let mut prev_batch = timeline.prev_batch.take();
1647
1648            let DeduplicationOutcome {
1649                all_events: events,
1650                in_memory_duplicated_event_ids,
1651                in_store_duplicated_event_ids,
1652                non_empty_all_duplicates: all_duplicates,
1653            } = filter_duplicate_events(
1654                &self.store,
1655                LinkedChunkId::Room(&self.state.room_id),
1656                &self.state.room_linked_chunk,
1657                timeline.events,
1658            )
1659            .await?;
1660
1661            // If the timeline isn't limited, and we already knew about some past events,
1662            // then this definitely knows what the timeline head is (either we know
1663            // about all the events persisted in storage, or we have a gap
1664            // somewhere). In this case, we can ditch the previous-batch
1665            // token, which is an optimization to avoid unnecessary future back-pagination
1666            // requests.
1667            //
1668            // We can also ditch it if we knew about all the events that came from sync,
1669            // namely, they were all deduplicated. In this case, using the
1670            // previous-batch token would only result in fetching other events we
1671            // knew about. This is slightly incorrect in the presence of
1672            // network splits, but this has shown to be Good Enough™.
1673            if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
1674                || all_duplicates
1675            {
1676                prev_batch = None;
1677            }
1678
1679            if prev_batch.is_some() {
1680                // Sad time: there's a gap, somewhere, in the timeline, and there's at least one
1681                // non-duplicated event. We don't know which threads might have gappy, so we
1682                // must invalidate them all :(
1683                // TODO(bnjbvr): figure out a better catchup mechanism for threads.
1684                let mut summaries_to_update = Vec::new();
1685
1686                for (thread_root, thread) in self.state.threads.iter_mut() {
1687                    // Empty the thread's linked chunk.
1688                    thread.clear();
1689
1690                    summaries_to_update.push(thread_root.clone());
1691                }
1692
1693                // Now, update the summaries to indicate that we're not sure what the latest
1694                // thread event is. The thread count can remain as is, as it might still be
1695                // valid, and there's no good value to reset it to, anyways.
1696                for thread_root in summaries_to_update {
1697                    let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1698                    else {
1699                        trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1700                        continue;
1701                    };
1702
1703                    if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1704                        prev_summary.latest_reply = None;
1705
1706                        target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1707
1708                        self.replace_event_at(location, target_event).await?;
1709                    }
1710                }
1711            }
1712
1713            if all_duplicates {
1714                // No new events and no gap (per the previous check), thus no need to change the
1715                // room state. We're done!
1716                return Ok((false, Vec::new()));
1717            }
1718
1719            let has_new_gap = prev_batch.is_some();
1720
1721            // If we've never waited for an initial previous-batch token, and we've now
1722            // inserted a gap, no need to wait for a previous-batch token later.
1723            if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap {
1724                self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst);
1725            }
1726
1727            // Remove the old duplicated events.
1728            //
1729            // We don't have to worry the removals can change the position of the existing
1730            // events, because we are pushing all _new_ `events` at the back.
1731            self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1732                .await?;
1733
1734            self.state
1735                .room_linked_chunk
1736                .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1737
1738            self.post_process_new_events(events, true).await?;
1739
1740            if timeline.limited && has_new_gap {
1741                // If there was a previous batch token for a limited timeline, unload the chunks
1742                // so it only contains the last one; otherwise, there might be a
1743                // valid gap in between, and observers may not render it (yet).
1744                //
1745                // We must do this *after* persisting these events to storage (in
1746                // `post_process_new_events`).
1747                self.shrink_to_last_chunk().await?;
1748            }
1749
1750            let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1751
1752            Ok((has_new_gap, timeline_event_diffs))
1753        }
1754
1755        /// Handle the result of a single back-pagination request.
1756        ///
1757        /// If the `prev_token` is set, then this function will check that the
1758        /// corresponding gap is present in the in-memory linked chunk.
1759        /// If it's not the case, `Ok(None)` will be returned, and the
1760        /// caller may decide to do something based on that (e.g. restart a
1761        /// pagination).
1762        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1763        pub async fn handle_backpagination(
1764            &mut self,
1765            events: Vec<Event>,
1766            mut new_token: Option<String>,
1767            prev_token: Option<String>,
1768        ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1769        {
1770            // Check that the previous token still exists; otherwise it's a sign that the
1771            // room's timeline has been cleared.
1772            let prev_gap_id = if let Some(token) = prev_token {
1773                // Find the corresponding gap in the in-memory linked chunk.
1774                let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| {
1775                    matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1776                });
1777
1778                if gap_chunk_id.is_none() {
1779                    // We got a previous-batch token from the linked chunk *before* running the
1780                    // request, but it is missing *after* completing the request.
1781                    //
1782                    // It may be a sign the linked chunk has been reset, but it's fine, per this
1783                    // function's contract.
1784                    return Ok(None);
1785                }
1786
1787                gap_chunk_id
1788            } else {
1789                None
1790            };
1791
1792            let DeduplicationOutcome {
1793                all_events: mut events,
1794                in_memory_duplicated_event_ids,
1795                in_store_duplicated_event_ids,
1796                non_empty_all_duplicates: all_duplicates,
1797            } = filter_duplicate_events(
1798                &self.store,
1799                LinkedChunkId::Room(&self.state.room_id),
1800                &self.state.room_linked_chunk,
1801                events,
1802            )
1803            .await?;
1804
1805            // If not all the events have been back-paginated, we need to remove the
1806            // previous ones, otherwise we can end up with misordered events.
1807            //
1808            // Consider the following scenario:
1809            // - sync returns [D, E, F]
1810            // - then sync returns [] with a previous batch token PB1, so the internal
1811            //   linked chunk state is [D, E, F, PB1].
1812            // - back-paginating with PB1 may return [A, B, C, D, E, F].
1813            //
1814            // Only inserting the new events when replacing PB1 would result in a timeline
1815            // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
1816            // all the events, in case this happens (see also #4746).
1817
1818            if !all_duplicates {
1819                // Let's forget all the previous events.
1820                self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1821                    .await?;
1822            } else {
1823                // All new events are duplicated, they can all be ignored.
1824                events.clear();
1825                // The gap can be ditched too, as it won't be useful to backpaginate any
1826                // further.
1827                new_token = None;
1828            }
1829
1830            // `/messages` has been called with `dir=b` (backwards), so the events are in
1831            // the inverted order; reorder them.
1832            let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1833
1834            let new_gap = new_token.map(|prev_token| Gap { prev_token });
1835            let reached_start = self.state.room_linked_chunk.finish_back_pagination(
1836                prev_gap_id,
1837                new_gap,
1838                &topo_ordered_events,
1839            );
1840
1841            // Note: this flushes updates to the store.
1842            self.post_process_new_events(topo_ordered_events, false).await?;
1843
1844            let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1845
1846            Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1847        }
1848
1849        /// Subscribe to thread for a given root event, and get a (maybe empty)
1850        /// initially known list of events for that thread.
1851        pub fn subscribe_to_thread(
1852            &mut self,
1853            root: OwnedEventId,
1854        ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1855            self.get_or_reload_thread(root).subscribe()
1856        }
1857
1858        /// Subscribe to the lazily initialized pinned event cache for this
1859        /// room.
1860        ///
1861        /// This is a persisted view over the pinned events of a room. The
1862        /// pinned events will be initially loaded from a network
1863        /// request to fetch the latest pinned events will be performed,
1864        /// to update it as needed. The list of pinned events will also
1865        /// be kept up-to-date as new events are pinned, and new related
1866        /// events show up from sync or backpagination.
1867        ///
1868        /// This requires that the room's event cache be initialized.
1869        pub async fn subscribe_to_pinned_events(
1870            &mut self,
1871            room: Room,
1872        ) -> Result<(Vec<Event>, Receiver<RoomEventCacheUpdate>), EventCacheError> {
1873            let pinned_event_cache = self.state.pinned_event_cache.get_or_init(|| {
1874                PinnedEventCache::new(
1875                    room,
1876                    self.state.linked_chunk_update_sender.clone(),
1877                    self.state.store.clone(),
1878                )
1879            });
1880
1881            pinned_event_cache.subscribe().await
1882        }
1883
1884        /// Back paginate in the given thread.
1885        ///
1886        /// Will always start from the end, unless we previously paginated.
1887        pub fn finish_thread_network_pagination(
1888            &mut self,
1889            root: OwnedEventId,
1890            prev_token: Option<String>,
1891            new_token: Option<String>,
1892            events: Vec<Event>,
1893        ) -> Option<BackPaginationOutcome> {
1894            self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1895        }
1896
1897        pub fn load_more_thread_events_backwards(
1898            &mut self,
1899            root: OwnedEventId,
1900        ) -> LoadMoreEventsBackwardsOutcome {
1901            self.get_or_reload_thread(root).load_more_events_backwards()
1902        }
1903
1904        // --------------------------------------------
1905        // utility methods
1906        // --------------------------------------------
1907
1908        /// Post-process new events, after they have been added to the in-memory
1909        /// linked chunk.
1910        ///
1911        /// Flushes updates to disk first.
1912        pub(in super::super) async fn post_process_new_events(
1913            &mut self,
1914            events: Vec<Event>,
1915            is_sync: bool,
1916        ) -> Result<(), EventCacheError> {
1917            // Update the store before doing the post-processing.
1918            self.propagate_changes().await?;
1919
1920            // Need an explicit re-borrow to avoid a deref vs deref-mut borrowck conflict
1921            // below.
1922            let state = &mut *self.state;
1923
1924            if let Some(pinned_event_cache) = state.pinned_event_cache.get_mut() {
1925                pinned_event_cache
1926                    .maybe_add_live_related_events(&events, &state.room_version_rules.redaction)
1927                    .await?;
1928            }
1929
1930            let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1931
1932            for event in events {
1933                self.maybe_apply_new_redaction(&event).await?;
1934
1935                if self.state.enabled_thread_support {
1936                    // Only add the event to a thread if:
1937                    // - thread support is enabled,
1938                    // - and if this is a sync (we can't know where to insert backpaginated events
1939                    //   in threads).
1940                    if is_sync {
1941                        if let Some(thread_root) = extract_thread_root(event.raw()) {
1942                            new_events_by_thread
1943                                .entry(thread_root)
1944                                .or_default()
1945                                .push(event.clone());
1946                        } else if let Some(event_id) = event.event_id() {
1947                            // If we spot the root of a thread, add it to its linked chunk.
1948                            if self.state.threads.contains_key(&event_id) {
1949                                new_events_by_thread
1950                                    .entry(event_id)
1951                                    .or_default()
1952                                    .push(event.clone());
1953                            }
1954                        }
1955                    }
1956
1957                    // Look for edits that may apply to a thread; we'll process them later.
1958                    if let Some(edit_target) = extract_edit_target(event.raw()) {
1959                        // If the edited event is known, and part of a thread,
1960                        if let Some((_location, edit_target_event)) =
1961                            self.find_event(&edit_target).await?
1962                            && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1963                        {
1964                            // Mark the thread for processing, unless it was already marked as
1965                            // such.
1966                            new_events_by_thread.entry(thread_root).or_default();
1967                        }
1968                    }
1969                }
1970
1971                // Save a bundled thread event, if there was one.
1972                if let Some(bundled_thread) = event.bundled_latest_thread_event {
1973                    self.save_events([*bundled_thread]).await?;
1974                }
1975            }
1976
1977            if self.state.enabled_thread_support {
1978                self.update_threads(new_events_by_thread).await?;
1979            }
1980
1981            Ok(())
1982        }
1983
1984        fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1985            // TODO: when there's persistent storage, try to lazily reload from disk, if
1986            // missing from memory.
1987            let room_id = self.state.room_id.clone();
1988            let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1989
1990            self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1991                ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender)
1992            })
1993        }
1994
1995        #[instrument(skip_all)]
1996        async fn update_threads(
1997            &mut self,
1998            new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1999        ) -> Result<(), EventCacheError> {
2000            for (thread_root, new_events) in new_events_by_thread {
2001                let thread_cache = self.get_or_reload_thread(thread_root.clone());
2002
2003                thread_cache.add_live_events(new_events);
2004
2005                let mut latest_event_id = thread_cache.latest_event_id();
2006
2007                // If there's an edit to the latest event in the thread, use the latest edit
2008                // event id as the latest event id for the thread summary.
2009                if let Some(event_id) = latest_event_id.as_ref()
2010                    && let Some((_, edits)) = self
2011                        .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
2012                        .await?
2013                    && let Some(latest_edit) = edits.last()
2014                {
2015                    latest_event_id = latest_edit.event_id();
2016                }
2017
2018                self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2019            }
2020
2021            Ok(())
2022        }
2023
2024        /// Update a thread summary on the given thread root, if needs be.
2025        async fn maybe_update_thread_summary(
2026            &mut self,
2027            thread_root: OwnedEventId,
2028            latest_event_id: Option<OwnedEventId>,
2029        ) -> Result<(), EventCacheError> {
2030            // Add a thread summary to the (room) event which has the thread root, if we
2031            // knew about it.
2032
2033            let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
2034                trace!(%thread_root, "thread root event is missing from the room linked chunk");
2035                return Ok(());
2036            };
2037
2038            let prev_summary = target_event.thread_summary.summary();
2039
2040            // Recompute the thread summary, if needs be.
2041
2042            // Read the latest number of thread replies from the store.
2043            //
2044            // Implementation note: since this is based on the `m.relates_to` field, and
2045            // that field can only be present on room messages, we don't have to
2046            // worry about filtering out aggregation events (like
2047            // reactions/edits/etc.). Pretty neat, huh?
2048            let num_replies = {
2049                let thread_replies = self
2050                    .store
2051                    .find_event_relations(
2052                        &self.state.room_id,
2053                        &thread_root,
2054                        Some(&[RelationType::Thread]),
2055                    )
2056                    .await?;
2057                thread_replies.len().try_into().unwrap_or(u32::MAX)
2058            };
2059
2060            let new_summary = if num_replies > 0 {
2061                Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
2062            } else {
2063                None
2064            };
2065
2066            if prev_summary == new_summary.as_ref() {
2067                trace!(%thread_root, "thread summary is already up-to-date");
2068                return Ok(());
2069            }
2070
2071            // Trigger an update to observers.
2072            trace!(%thread_root, "updating thread summary: {new_summary:?}");
2073            target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
2074            self.replace_event_at(location, target_event).await
2075        }
2076
2077        /// Replaces a single event, be it saved in memory or in the store.
2078        ///
2079        /// If it was saved in memory, this will emit a notification to
2080        /// observers that a single item has been replaced. Otherwise,
2081        /// such a notification is not emitted, because observers are
2082        /// unlikely to observe the store updates directly.
2083        pub(crate) async fn replace_event_at(
2084            &mut self,
2085            location: EventLocation,
2086            event: Event,
2087        ) -> Result<(), EventCacheError> {
2088            match location {
2089                EventLocation::Memory(position) => {
2090                    self.state
2091                        .room_linked_chunk
2092                        .replace_event_at(position, event)
2093                        .expect("should have been a valid position of an item");
2094                    // We just changed the in-memory representation; synchronize this with
2095                    // the store.
2096                    self.propagate_changes().await?;
2097                }
2098                EventLocation::Store => {
2099                    self.save_events([event]).await?;
2100                }
2101            }
2102
2103            Ok(())
2104        }
2105
2106        /// If the given event is a redaction, try to retrieve the
2107        /// to-be-redacted event in the chunk, and replace it by the
2108        /// redacted form.
2109        #[instrument(skip_all)]
2110        async fn maybe_apply_new_redaction(
2111            &mut self,
2112            event: &Event,
2113        ) -> Result<(), EventCacheError> {
2114            let raw_event = event.raw();
2115
2116            // Do not deserialise the entire event if we aren't certain it's a
2117            // `m.room.redaction`. It saves a non-negligible amount of computations.
2118            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
2119                raw_event.get_field::<MessageLikeEventType>("type")
2120            else {
2121                return Ok(());
2122            };
2123
2124            // It is a `m.room.redaction`! We can deserialize it entirely.
2125
2126            let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
2127                redaction,
2128            ))) = raw_event.deserialize()
2129            else {
2130                return Ok(());
2131            };
2132
2133            let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
2134                warn!("missing target event id from the redaction event");
2135                return Ok(());
2136            };
2137
2138            // Replace the redacted event by a redacted form, if we knew about it.
2139            let Some((location, mut target_event)) = self.find_event(event_id).await? else {
2140                trace!("redacted event is missing from the linked chunk");
2141                return Ok(());
2142            };
2143
2144            // Don't redact already redacted events.
2145            let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
2146                // TODO: replace with `deserialized.is_redacted()` when
2147                // https://github.com/ruma/ruma/pull/2254 has been merged.
2148                match deserialized {
2149                    AnySyncTimelineEvent::MessageLike(ev) => {
2150                        if ev.is_redacted() {
2151                            return Ok(());
2152                        }
2153                    }
2154                    AnySyncTimelineEvent::State(ev) => {
2155                        if ev.is_redacted() {
2156                            return Ok(());
2157                        }
2158                    }
2159                }
2160
2161                // If the event is part of a thread, update the thread linked chunk and the
2162                // summary.
2163                extract_thread_root(target_event.raw())
2164            } else {
2165                warn!("failed to deserialize the event to redact");
2166                None
2167            };
2168
2169            if let Some(redacted_event) = apply_redaction(
2170                target_event.raw(),
2171                event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
2172                &self.state.room_version_rules.redaction,
2173            ) {
2174                // It's safe to cast `redacted_event` here:
2175                // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
2176                //   when calling .raw(), so it's still one under the hood.
2177                // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
2178                target_event.replace_raw(redacted_event.cast_unchecked());
2179
2180                self.replace_event_at(location, target_event).await?;
2181
2182                // If the redacted event was part of a thread, remove it in the thread linked
2183                // chunk too, and make sure to update the thread root's summary
2184                // as well.
2185                //
2186                // Note: there is an ordering issue here: the above `replace_event_at` must
2187                // happen BEFORE we recompute the summary, otherwise the set of
2188                // replies may include the to-be-redacted event.
2189                if let Some(thread_root) = thread_root
2190                    && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
2191                {
2192                    thread_cache.remove_if_present(event_id);
2193
2194                    // The number of replies may have changed, so update the thread summary if
2195                    // needs be.
2196                    let latest_event_id = thread_cache.latest_event_id();
2197                    self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2198                }
2199            }
2200
2201            Ok(())
2202        }
2203
2204        /// Save events into the database, without notifying observers.
2205        pub async fn save_events(
2206            &mut self,
2207            events: impl IntoIterator<Item = Event>,
2208        ) -> Result<(), EventCacheError> {
2209            let store = self.store.clone();
2210            let room_id = self.state.room_id.clone();
2211            let events = events.into_iter().collect::<Vec<_>>();
2212
2213            // Spawn a task so the save is uninterrupted by task cancellation.
2214            spawn(async move {
2215                for event in events {
2216                    store.save_event(&room_id, event).await?;
2217                }
2218                super::Result::Ok(())
2219            })
2220            .await
2221            .expect("joining failed")?;
2222
2223            Ok(())
2224        }
2225
2226        #[cfg(test)]
2227        pub fn is_dirty(&self) -> bool {
2228            EventCacheStoreLockGuard::is_dirty(&self.store)
2229        }
2230    }
2231
2232    /// Load a linked chunk's full metadata, making sure the chunks are
2233    /// according to their their links.
2234    ///
2235    /// Returns `None` if there's no such linked chunk in the store, or an
2236    /// error if the linked chunk is malformed.
2237    async fn load_linked_chunk_metadata(
2238        store_guard: &EventCacheStoreLockGuard,
2239        linked_chunk_id: LinkedChunkId<'_>,
2240    ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
2241        let mut all_chunks = store_guard
2242            .load_all_chunks_metadata(linked_chunk_id)
2243            .await
2244            .map_err(EventCacheError::from)?;
2245
2246        if all_chunks.is_empty() {
2247            // There are no chunks, so there's nothing to do.
2248            return Ok(None);
2249        }
2250
2251        // Transform the vector into a hashmap, for quick lookup of the predecessors.
2252        let chunk_map: HashMap<_, _> =
2253            all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
2254
2255        // Find a last chunk.
2256        let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
2257        let Some(last) = iter.next() else {
2258            return Err(EventCacheError::InvalidLinkedChunkMetadata {
2259                details: "no last chunk found".to_owned(),
2260            });
2261        };
2262
2263        // There must at most one last chunk.
2264        if let Some(other_last) = iter.next() {
2265            return Err(EventCacheError::InvalidLinkedChunkMetadata {
2266                details: format!(
2267                    "chunks {} and {} both claim to be last chunks",
2268                    last.identifier.index(),
2269                    other_last.identifier.index()
2270                ),
2271            });
2272        }
2273
2274        // Rewind the chain back to the first chunk, and do some checks at the same
2275        // time.
2276        let mut seen = HashSet::new();
2277        let mut current = last;
2278        loop {
2279            // If we've already seen this chunk, there's a cycle somewhere.
2280            if !seen.insert(current.identifier) {
2281                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2282                    details: format!(
2283                        "cycle detected in linked chunk at {}",
2284                        current.identifier.index()
2285                    ),
2286                });
2287            }
2288
2289            let Some(prev_id) = current.previous else {
2290                // If there's no previous chunk, we're done.
2291                if seen.len() != all_chunks.len() {
2292                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
2293                        details: format!(
2294                            "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
2295                            seen.len(),
2296                            all_chunks.len()
2297                        ),
2298                    });
2299                }
2300                break;
2301            };
2302
2303            // If the previous chunk is not in the map, then it's unknown
2304            // and missing.
2305            let Some(pred_meta) = chunk_map.get(&prev_id) else {
2306                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2307                    details: format!(
2308                        "missing predecessor {} chunk for {}",
2309                        prev_id.index(),
2310                        current.identifier.index()
2311                    ),
2312                });
2313            };
2314
2315            // If the previous chunk isn't connected to the next, then the link is invalid.
2316            if pred_meta.next != Some(current.identifier) {
2317                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2318                    details: format!(
2319                        "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
2320                        pred_meta.identifier.index(),
2321                        pred_meta.next.map(|chunk_id| chunk_id.index()),
2322                        current.identifier.index()
2323                    ),
2324                });
2325            }
2326
2327            current = *pred_meta;
2328        }
2329
2330        // At this point, `current` is the identifier of the first chunk.
2331        //
2332        // Reorder the resulting vector, by going through the chain of `next` links, and
2333        // swapping items into their final position.
2334        //
2335        // Invariant in this loop: all items in [0..i[ are in their final, correct
2336        // position.
2337        let mut current = current.identifier;
2338        for i in 0..all_chunks.len() {
2339            // Find the target metadata.
2340            let j = all_chunks
2341                .iter()
2342                .rev()
2343                .position(|meta| meta.identifier == current)
2344                .map(|j| all_chunks.len() - 1 - j)
2345                .expect("the target chunk must be present in the metadata");
2346            if i != j {
2347                all_chunks.swap(i, j);
2348            }
2349            if let Some(next) = all_chunks[i].next {
2350                current = next;
2351            }
2352        }
2353
2354        Ok(Some(all_chunks))
2355    }
2356
2357    /// Removes the bundled relations from an event, if they were present.
2358    ///
2359    /// Only replaces the present if it contained bundled relations.
2360    fn strip_relations_if_present<T>(event: &mut Raw<T>) {
2361        // We're going to get rid of the `unsigned`/`m.relations` field, if it's
2362        // present.
2363        // Use a closure that returns an option so we can quickly short-circuit.
2364        let mut closure = || -> Option<()> {
2365            let mut val: serde_json::Value = event.deserialize_as().ok()?;
2366            let unsigned = val.get_mut("unsigned")?;
2367            let unsigned_obj = unsigned.as_object_mut()?;
2368            if unsigned_obj.remove("m.relations").is_some() {
2369                *event = Raw::new(&val).ok()?.cast_unchecked();
2370            }
2371            None
2372        };
2373        let _ = closure();
2374    }
2375
2376    fn strip_relations_from_event(ev: &mut Event) {
2377        match &mut ev.kind {
2378            TimelineEventKind::Decrypted(decrypted) => {
2379                // Remove all information about encryption info for
2380                // the bundled events.
2381                decrypted.unsigned_encryption_info = None;
2382
2383                // Remove the `unsigned`/`m.relations` field, if needs be.
2384                strip_relations_if_present(&mut decrypted.event);
2385            }
2386
2387            TimelineEventKind::UnableToDecrypt { event, .. }
2388            | TimelineEventKind::PlainText { event } => {
2389                strip_relations_if_present(event);
2390            }
2391        }
2392    }
2393
2394    /// Strips the bundled relations from a collection of events.
2395    fn strip_relations_from_events(items: &mut [Event]) {
2396        for ev in items.iter_mut() {
2397            strip_relations_from_event(ev);
2398        }
2399    }
2400
2401    /// Implementation of [`RoomEventCacheStateLockReadGuard::find_event`] and
2402    /// [`RoomEventCacheStateLockWriteGuard::find_event`].
2403    async fn find_event(
2404        event_id: &EventId,
2405        room_id: &RoomId,
2406        room_linked_chunk: &EventLinkedChunk,
2407        store: &EventCacheStoreLockGuard,
2408    ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
2409        // There are supposedly fewer events loaded in memory than in the store. Let's
2410        // start by looking up in the `EventLinkedChunk`.
2411        for (position, event) in room_linked_chunk.revents() {
2412            if event.event_id().as_deref() == Some(event_id) {
2413                return Ok(Some((EventLocation::Memory(position), event.clone())));
2414            }
2415        }
2416
2417        Ok(store.find_event(room_id, event_id).await?.map(|event| (EventLocation::Store, event)))
2418    }
2419
2420    /// Implementation of
2421    /// [`RoomEventCacheStateLockReadGuard::find_event_with_relations`] and
2422    /// [`RoomEventCacheStateLockWriteGuard::find_event_with_relations`].
2423    async fn find_event_with_relations(
2424        event_id: &EventId,
2425        room_id: &RoomId,
2426        filters: Option<Vec<RelationType>>,
2427        room_linked_chunk: &EventLinkedChunk,
2428        store: &EventCacheStoreLockGuard,
2429    ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
2430        // First, hit storage to get the target event and its related events.
2431        let found = store.find_event(room_id, event_id).await?;
2432
2433        let Some(target) = found else {
2434            // We haven't found the event: return early.
2435            return Ok(None);
2436        };
2437
2438        // Then, find the transitive closure of all the related events.
2439        let related =
2440            find_event_relations(event_id, room_id, filters, room_linked_chunk, store).await?;
2441
2442        Ok(Some((target, related)))
2443    }
2444
2445    /// Implementation of
2446    /// [`RoomEventCacheStateLockReadGuard::find_event_relations`].
2447    async fn find_event_relations(
2448        event_id: &EventId,
2449        room_id: &RoomId,
2450        filters: Option<Vec<RelationType>>,
2451        room_linked_chunk: &EventLinkedChunk,
2452        store: &EventCacheStoreLockGuard,
2453    ) -> Result<Vec<Event>, EventCacheError> {
2454        // Initialize the stack with all the related events, to find the
2455        // transitive closure of all the related events.
2456        let mut related = store.find_event_relations(room_id, event_id, filters.as_deref()).await?;
2457        let mut stack =
2458            related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
2459
2460        // Also keep track of already seen events, in case there's a loop in the
2461        // relation graph.
2462        let mut already_seen = HashSet::new();
2463        already_seen.insert(event_id.to_owned());
2464
2465        let mut num_iters = 1;
2466
2467        // Find the related event for each previously-related event.
2468        while let Some(event_id) = stack.pop() {
2469            if !already_seen.insert(event_id.clone()) {
2470                // Skip events we've already seen.
2471                continue;
2472            }
2473
2474            let other_related =
2475                store.find_event_relations(room_id, &event_id, filters.as_deref()).await?;
2476
2477            stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
2478            related.extend(other_related);
2479
2480            num_iters += 1;
2481        }
2482
2483        trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
2484
2485        // Sort the results by their positions in the linked chunk, if available.
2486        //
2487        // If an event doesn't have a known position, it goes to the start of the array.
2488        related.sort_by(|(_, lhs), (_, rhs)| {
2489            use std::cmp::Ordering;
2490
2491            match (lhs, rhs) {
2492                (None, None) => Ordering::Equal,
2493                (None, Some(_)) => Ordering::Less,
2494                (Some(_), None) => Ordering::Greater,
2495                (Some(lhs), Some(rhs)) => {
2496                    let lhs = room_linked_chunk.event_order(*lhs);
2497                    let rhs = room_linked_chunk.event_order(*rhs);
2498
2499                    // The events should have a definite position, but in the case they don't,
2500                    // still consider that not having a position means you'll end at the start
2501                    // of the array.
2502                    match (lhs, rhs) {
2503                        (None, None) => Ordering::Equal,
2504                        (None, Some(_)) => Ordering::Less,
2505                        (Some(_), None) => Ordering::Greater,
2506                        (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
2507                    }
2508                }
2509            }
2510        });
2511
2512        // Keep only the events, not their positions.
2513        let related = related.into_iter().map(|(event, _pos)| event).collect();
2514
2515        Ok(related)
2516    }
2517}
2518
2519/// An enum representing where an event has been found.
2520pub(super) enum EventLocation {
2521    /// Event lives in memory (and likely in the store!).
2522    Memory(Position),
2523
2524    /// Event lives in the store only, it has not been loaded in memory yet.
2525    Store,
2526}
2527
2528pub(super) use private::RoomEventCacheStateLock;
2529
2530#[cfg(test)]
2531mod tests {
2532    use matrix_sdk_base::event_cache::Event;
2533    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2534    use ruma::{
2535        RoomId, event_id,
2536        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2537        room_id, user_id,
2538    };
2539
2540    use crate::test_utils::logged_in_client;
2541
2542    #[async_test]
2543    async fn test_find_event_by_id_with_edit_relation() {
2544        let original_id = event_id!("$original");
2545        let related_id = event_id!("$related");
2546        let room_id = room_id!("!galette:saucisse.bzh");
2547        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2548
2549        assert_relations(
2550            room_id,
2551            f.text_msg("Original event").event_id(original_id).into(),
2552            f.text_msg("* An edited event")
2553                .edit(
2554                    original_id,
2555                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2556                )
2557                .event_id(related_id)
2558                .into(),
2559            f,
2560        )
2561        .await;
2562    }
2563
2564    #[async_test]
2565    async fn test_find_event_by_id_with_thread_reply_relation() {
2566        let original_id = event_id!("$original");
2567        let related_id = event_id!("$related");
2568        let room_id = room_id!("!galette:saucisse.bzh");
2569        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2570
2571        assert_relations(
2572            room_id,
2573            f.text_msg("Original event").event_id(original_id).into(),
2574            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2575            f,
2576        )
2577        .await;
2578    }
2579
2580    #[async_test]
2581    async fn test_find_event_by_id_with_reaction_relation() {
2582        let original_id = event_id!("$original");
2583        let related_id = event_id!("$related");
2584        let room_id = room_id!("!galette:saucisse.bzh");
2585        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2586
2587        assert_relations(
2588            room_id,
2589            f.text_msg("Original event").event_id(original_id).into(),
2590            f.reaction(original_id, ":D").event_id(related_id).into(),
2591            f,
2592        )
2593        .await;
2594    }
2595
2596    #[async_test]
2597    async fn test_find_event_by_id_with_poll_response_relation() {
2598        let original_id = event_id!("$original");
2599        let related_id = event_id!("$related");
2600        let room_id = room_id!("!galette:saucisse.bzh");
2601        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2602
2603        assert_relations(
2604            room_id,
2605            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2606                .event_id(original_id)
2607                .into(),
2608            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2609            f,
2610        )
2611        .await;
2612    }
2613
2614    #[async_test]
2615    async fn test_find_event_by_id_with_poll_end_relation() {
2616        let original_id = event_id!("$original");
2617        let related_id = event_id!("$related");
2618        let room_id = room_id!("!galette:saucisse.bzh");
2619        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2620
2621        assert_relations(
2622            room_id,
2623            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2624                .event_id(original_id)
2625                .into(),
2626            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2627            f,
2628        )
2629        .await;
2630    }
2631
2632    #[async_test]
2633    async fn test_find_event_by_id_with_filtered_relationships() {
2634        let original_id = event_id!("$original");
2635        let related_id = event_id!("$related");
2636        let associated_related_id = event_id!("$recursive_related");
2637        let room_id = room_id!("!galette:saucisse.bzh");
2638        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2639
2640        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2641        let related_event = event_factory
2642            .text_msg("* Edited event")
2643            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2644            .event_id(related_id)
2645            .into();
2646        let associated_related_event =
2647            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2648
2649        let client = logged_in_client(None).await;
2650
2651        let event_cache = client.event_cache();
2652        event_cache.subscribe().unwrap();
2653
2654        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2655        let room = client.get_room(room_id).unwrap();
2656
2657        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2658
2659        // Save the original event.
2660        room_event_cache.save_events([original_event]).await;
2661
2662        // Save the related event.
2663        room_event_cache.save_events([related_event]).await;
2664
2665        // Save the associated related event, which redacts the related event.
2666        room_event_cache.save_events([associated_related_event]).await;
2667
2668        let filter = Some(vec![RelationType::Replacement]);
2669        let (event, related_events) = room_event_cache
2670            .find_event_with_relations(original_id, filter)
2671            .await
2672            .expect("Failed to find the event with relations")
2673            .expect("Event has no relation");
2674        // Fetched event is the right one.
2675        let cached_event_id = event.event_id().unwrap();
2676        assert_eq!(cached_event_id, original_id);
2677
2678        // There's only the edit event (an edit event can't have its own edit event).
2679        assert_eq!(related_events.len(), 1);
2680
2681        let related_event_id = related_events[0].event_id().unwrap();
2682        assert_eq!(related_event_id, related_id);
2683
2684        // Now we'll filter threads instead, there should be no related events
2685        let filter = Some(vec![RelationType::Thread]);
2686        let (event, related_events) = room_event_cache
2687            .find_event_with_relations(original_id, filter)
2688            .await
2689            .expect("Failed to find the event with relations")
2690            .expect("Event has no relation");
2691
2692        // Fetched event is the right one.
2693        let cached_event_id = event.event_id().unwrap();
2694        assert_eq!(cached_event_id, original_id);
2695        // No Thread related events found
2696        assert!(related_events.is_empty());
2697    }
2698
2699    #[async_test]
2700    async fn test_find_event_by_id_with_recursive_relation() {
2701        let original_id = event_id!("$original");
2702        let related_id = event_id!("$related");
2703        let associated_related_id = event_id!("$recursive_related");
2704        let room_id = room_id!("!galette:saucisse.bzh");
2705        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2706
2707        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2708        let related_event = event_factory
2709            .text_msg("* Edited event")
2710            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2711            .event_id(related_id)
2712            .into();
2713        let associated_related_event =
2714            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2715
2716        let client = logged_in_client(None).await;
2717
2718        let event_cache = client.event_cache();
2719        event_cache.subscribe().unwrap();
2720
2721        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2722        let room = client.get_room(room_id).unwrap();
2723
2724        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2725
2726        // Save the original event.
2727        room_event_cache.save_events([original_event]).await;
2728
2729        // Save the related event.
2730        room_event_cache.save_events([related_event]).await;
2731
2732        // Save the associated related event, which redacts the related event.
2733        room_event_cache.save_events([associated_related_event]).await;
2734
2735        let (event, related_events) = room_event_cache
2736            .find_event_with_relations(original_id, None)
2737            .await
2738            .expect("Failed to find the event with relations")
2739            .expect("Event has no relation");
2740        // Fetched event is the right one.
2741        let cached_event_id = event.event_id().unwrap();
2742        assert_eq!(cached_event_id, original_id);
2743
2744        // There are both the related id and the associatively related id
2745        assert_eq!(related_events.len(), 2);
2746
2747        let related_event_id = related_events[0].event_id().unwrap();
2748        assert_eq!(related_event_id, related_id);
2749        let related_event_id = related_events[1].event_id().unwrap();
2750        assert_eq!(related_event_id, associated_related_id);
2751    }
2752
2753    async fn assert_relations(
2754        room_id: &RoomId,
2755        original_event: Event,
2756        related_event: Event,
2757        event_factory: EventFactory,
2758    ) {
2759        let client = logged_in_client(None).await;
2760
2761        let event_cache = client.event_cache();
2762        event_cache.subscribe().unwrap();
2763
2764        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2765        let room = client.get_room(room_id).unwrap();
2766
2767        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2768
2769        // Save the original event.
2770        let original_event_id = original_event.event_id().unwrap();
2771        room_event_cache.save_events([original_event]).await;
2772
2773        // Save an unrelated event to check it's not in the related events list.
2774        let unrelated_id = event_id!("$2");
2775        room_event_cache
2776            .save_events([event_factory
2777                .text_msg("An unrelated event")
2778                .event_id(unrelated_id)
2779                .into()])
2780            .await;
2781
2782        // Save the related event.
2783        let related_id = related_event.event_id().unwrap();
2784        room_event_cache.save_events([related_event]).await;
2785
2786        let (event, related_events) = room_event_cache
2787            .find_event_with_relations(&original_event_id, None)
2788            .await
2789            .expect("Failed to find the event with relations")
2790            .expect("Event has no relation");
2791        // Fetched event is the right one.
2792        let cached_event_id = event.event_id().unwrap();
2793        assert_eq!(cached_event_id, original_event_id);
2794
2795        // There is only the actually related event in the related ones
2796        let related_event_id = related_events[0].event_id().unwrap();
2797        assert_eq!(related_event_id, related_id);
2798    }
2799}
2800
2801#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
2802mod timed_tests {
2803    use std::{ops::Not, sync::Arc};
2804
2805    use assert_matches::assert_matches;
2806    use assert_matches2::assert_let;
2807    use eyeball_im::VectorDiff;
2808    use futures_util::FutureExt;
2809    use matrix_sdk_base::{
2810        event_cache::{
2811            Gap,
2812            store::{EventCacheStore as _, MemoryStore},
2813        },
2814        linked_chunk::{
2815            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2816            lazy_loader::from_all_chunks,
2817        },
2818        store::StoreConfig,
2819        sync::{JoinedRoomUpdate, Timeline},
2820    };
2821    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2822    use ruma::{
2823        EventId, OwnedUserId, event_id,
2824        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2825        room_id, user_id,
2826    };
2827    use tokio::task::yield_now;
2828
2829    use super::RoomEventCacheGenericUpdate;
2830    use crate::{
2831        assert_let_timeout,
2832        event_cache::{RoomEventCache, RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2833        test_utils::client::MockClientBuilder,
2834    };
2835
2836    #[async_test]
2837    async fn test_write_to_storage() {
2838        let room_id = room_id!("!galette:saucisse.bzh");
2839        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2840
2841        let event_cache_store = Arc::new(MemoryStore::new());
2842
2843        let client = MockClientBuilder::new(None)
2844            .on_builder(|builder| {
2845                builder.store_config(
2846                    StoreConfig::new("hodlor".to_owned())
2847                        .event_cache_store(event_cache_store.clone()),
2848                )
2849            })
2850            .build()
2851            .await;
2852
2853        let event_cache = client.event_cache();
2854
2855        // Don't forget to subscribe and like.
2856        event_cache.subscribe().unwrap();
2857
2858        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2859        let room = client.get_room(room_id).unwrap();
2860
2861        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2862        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2863
2864        // Propagate an update for a message and a prev-batch token.
2865        let timeline = Timeline {
2866            limited: true,
2867            prev_batch: Some("raclette".to_owned()),
2868            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2869        };
2870
2871        room_event_cache
2872            .inner
2873            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2874            .await
2875            .unwrap();
2876
2877        // Just checking the generic update is correct.
2878        assert_matches!(
2879            generic_stream.recv().await,
2880            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2881                assert_eq!(expected_room_id, room_id);
2882            }
2883        );
2884        assert!(generic_stream.is_empty());
2885
2886        // Check the storage.
2887        let linked_chunk = from_all_chunks::<3, _, _>(
2888            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2889        )
2890        .unwrap()
2891        .unwrap();
2892
2893        assert_eq!(linked_chunk.chunks().count(), 2);
2894
2895        let mut chunks = linked_chunk.chunks();
2896
2897        // We start with the gap.
2898        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2899            assert_eq!(gap.prev_token, "raclette");
2900        });
2901
2902        // Then we have the stored event.
2903        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2904            assert_eq!(events.len(), 1);
2905            let deserialized = events[0].raw().deserialize().unwrap();
2906            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2907            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2908        });
2909
2910        // That's all, folks!
2911        assert!(chunks.next().is_none());
2912    }
2913
2914    #[async_test]
2915    async fn test_write_to_storage_strips_bundled_relations() {
2916        let room_id = room_id!("!galette:saucisse.bzh");
2917        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2918
2919        let event_cache_store = Arc::new(MemoryStore::new());
2920
2921        let client = MockClientBuilder::new(None)
2922            .on_builder(|builder| {
2923                builder.store_config(
2924                    StoreConfig::new("hodlor".to_owned())
2925                        .event_cache_store(event_cache_store.clone()),
2926                )
2927            })
2928            .build()
2929            .await;
2930
2931        let event_cache = client.event_cache();
2932
2933        // Don't forget to subscribe and like.
2934        event_cache.subscribe().unwrap();
2935
2936        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2937        let room = client.get_room(room_id).unwrap();
2938
2939        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2940        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2941
2942        // Propagate an update for a message with bundled relations.
2943        let ev = f
2944            .text_msg("hey yo")
2945            .sender(*ALICE)
2946            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2947            .into_event();
2948
2949        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2950
2951        room_event_cache
2952            .inner
2953            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2954            .await
2955            .unwrap();
2956
2957        // Just checking the generic update is correct.
2958        assert_matches!(
2959            generic_stream.recv().await,
2960            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2961                assert_eq!(expected_room_id, room_id);
2962            }
2963        );
2964        assert!(generic_stream.is_empty());
2965
2966        // The in-memory linked chunk keeps the bundled relation.
2967        {
2968            let events = room_event_cache.events().await.unwrap();
2969
2970            assert_eq!(events.len(), 1);
2971
2972            let ev = events[0].raw().deserialize().unwrap();
2973            assert_let!(
2974                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2975            );
2976
2977            let original = msg.as_original().unwrap();
2978            assert_eq!(original.content.body(), "hey yo");
2979            assert!(original.unsigned.relations.replace.is_some());
2980        }
2981
2982        // The one in storage does not.
2983        let linked_chunk = from_all_chunks::<3, _, _>(
2984            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2985        )
2986        .unwrap()
2987        .unwrap();
2988
2989        assert_eq!(linked_chunk.chunks().count(), 1);
2990
2991        let mut chunks = linked_chunk.chunks();
2992        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2993            assert_eq!(events.len(), 1);
2994
2995            let ev = events[0].raw().deserialize().unwrap();
2996            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2997
2998            let original = msg.as_original().unwrap();
2999            assert_eq!(original.content.body(), "hey yo");
3000            assert!(original.unsigned.relations.replace.is_none());
3001        });
3002
3003        // That's all, folks!
3004        assert!(chunks.next().is_none());
3005    }
3006
3007    #[async_test]
3008    async fn test_clear() {
3009        let room_id = room_id!("!galette:saucisse.bzh");
3010        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
3011
3012        let event_cache_store = Arc::new(MemoryStore::new());
3013
3014        let event_id1 = event_id!("$1");
3015        let event_id2 = event_id!("$2");
3016
3017        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3018        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3019
3020        // Prefill the store with some data.
3021        event_cache_store
3022            .handle_linked_chunk_updates(
3023                LinkedChunkId::Room(room_id),
3024                vec![
3025                    // An empty items chunk.
3026                    Update::NewItemsChunk {
3027                        previous: None,
3028                        new: ChunkIdentifier::new(0),
3029                        next: None,
3030                    },
3031                    // A gap chunk.
3032                    Update::NewGapChunk {
3033                        previous: Some(ChunkIdentifier::new(0)),
3034                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
3035                        new: ChunkIdentifier::new(42),
3036                        next: None,
3037                        gap: Gap { prev_token: "comté".to_owned() },
3038                    },
3039                    // Another items chunk, non-empty this time.
3040                    Update::NewItemsChunk {
3041                        previous: Some(ChunkIdentifier::new(42)),
3042                        new: ChunkIdentifier::new(1),
3043                        next: None,
3044                    },
3045                    Update::PushItems {
3046                        at: Position::new(ChunkIdentifier::new(1), 0),
3047                        items: vec![ev1.clone()],
3048                    },
3049                    // And another items chunk, non-empty again.
3050                    Update::NewItemsChunk {
3051                        previous: Some(ChunkIdentifier::new(1)),
3052                        new: ChunkIdentifier::new(2),
3053                        next: None,
3054                    },
3055                    Update::PushItems {
3056                        at: Position::new(ChunkIdentifier::new(2), 0),
3057                        items: vec![ev2.clone()],
3058                    },
3059                ],
3060            )
3061            .await
3062            .unwrap();
3063
3064        let client = MockClientBuilder::new(None)
3065            .on_builder(|builder| {
3066                builder.store_config(
3067                    StoreConfig::new("hodlor".to_owned())
3068                        .event_cache_store(event_cache_store.clone()),
3069                )
3070            })
3071            .build()
3072            .await;
3073
3074        let event_cache = client.event_cache();
3075
3076        // Don't forget to subscribe and like.
3077        event_cache.subscribe().unwrap();
3078
3079        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3080        let room = client.get_room(room_id).unwrap();
3081
3082        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3083
3084        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3085        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3086
3087        // The rooms knows about all cached events.
3088        {
3089            assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3090            assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3091        }
3092
3093        // But only part of events are loaded from the store
3094        {
3095            // The room must contain only one event because only one chunk has been loaded.
3096            assert_eq!(items.len(), 1);
3097            assert_eq!(items[0].event_id().unwrap(), event_id2);
3098
3099            assert!(stream.is_empty());
3100        }
3101
3102        // Let's load more chunks to load all events.
3103        {
3104            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3105
3106            assert_let_timeout!(
3107                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3108            );
3109            assert_eq!(diffs.len(), 1);
3110            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3111                // Here you are `event_id1`!
3112                assert_eq!(event.event_id().unwrap(), event_id1);
3113            });
3114
3115            assert!(stream.is_empty());
3116
3117            assert_let_timeout!(
3118                Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
3119                    generic_stream.recv()
3120            );
3121            assert_eq!(room_id, expected_room_id);
3122            assert!(generic_stream.is_empty());
3123        }
3124
3125        // After clearing,…
3126        room_event_cache.clear().await.unwrap();
3127
3128        //… we get an update that the content has been cleared.
3129        assert_let_timeout!(
3130            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3131        );
3132        assert_eq!(diffs.len(), 1);
3133        assert_let!(VectorDiff::Clear = &diffs[0]);
3134
3135        // … same with a generic update.
3136        assert_let_timeout!(
3137            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3138        );
3139        assert_eq!(received_room_id, room_id);
3140        assert!(generic_stream.is_empty());
3141
3142        // Events individually are not forgotten by the event cache, after clearing a
3143        // room.
3144        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3145
3146        // But their presence in a linked chunk is forgotten.
3147        let items = room_event_cache.events().await.unwrap();
3148        assert!(items.is_empty());
3149
3150        // The event cache store too.
3151        let linked_chunk = from_all_chunks::<3, _, _>(
3152            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
3153        )
3154        .unwrap()
3155        .unwrap();
3156
3157        // Note: while the event cache store could return `None` here, clearing it will
3158        // reset it to its initial form, maintaining the invariant that it
3159        // contains a single items chunk that's empty.
3160        assert_eq!(linked_chunk.num_items(), 0);
3161    }
3162
3163    #[async_test]
3164    async fn test_load_from_storage() {
3165        let room_id = room_id!("!galette:saucisse.bzh");
3166        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
3167
3168        let event_cache_store = Arc::new(MemoryStore::new());
3169
3170        let event_id1 = event_id!("$1");
3171        let event_id2 = event_id!("$2");
3172
3173        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3174        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3175
3176        // Prefill the store with some data.
3177        event_cache_store
3178            .handle_linked_chunk_updates(
3179                LinkedChunkId::Room(room_id),
3180                vec![
3181                    // An empty items chunk.
3182                    Update::NewItemsChunk {
3183                        previous: None,
3184                        new: ChunkIdentifier::new(0),
3185                        next: None,
3186                    },
3187                    // A gap chunk.
3188                    Update::NewGapChunk {
3189                        previous: Some(ChunkIdentifier::new(0)),
3190                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
3191                        new: ChunkIdentifier::new(42),
3192                        next: None,
3193                        gap: Gap { prev_token: "cheddar".to_owned() },
3194                    },
3195                    // Another items chunk, non-empty this time.
3196                    Update::NewItemsChunk {
3197                        previous: Some(ChunkIdentifier::new(42)),
3198                        new: ChunkIdentifier::new(1),
3199                        next: None,
3200                    },
3201                    Update::PushItems {
3202                        at: Position::new(ChunkIdentifier::new(1), 0),
3203                        items: vec![ev1.clone()],
3204                    },
3205                    // And another items chunk, non-empty again.
3206                    Update::NewItemsChunk {
3207                        previous: Some(ChunkIdentifier::new(1)),
3208                        new: ChunkIdentifier::new(2),
3209                        next: None,
3210                    },
3211                    Update::PushItems {
3212                        at: Position::new(ChunkIdentifier::new(2), 0),
3213                        items: vec![ev2.clone()],
3214                    },
3215                ],
3216            )
3217            .await
3218            .unwrap();
3219
3220        let client = MockClientBuilder::new(None)
3221            .on_builder(|builder| {
3222                builder.store_config(
3223                    StoreConfig::new("hodlor".to_owned())
3224                        .event_cache_store(event_cache_store.clone()),
3225                )
3226            })
3227            .build()
3228            .await;
3229
3230        let event_cache = client.event_cache();
3231
3232        // Don't forget to subscribe and like.
3233        event_cache.subscribe().unwrap();
3234
3235        // Let's check whether the generic updates are received for the initialisation.
3236        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3237
3238        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3239        let room = client.get_room(room_id).unwrap();
3240
3241        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3242
3243        // The room event cache has been loaded. A generic update must have been
3244        // triggered.
3245        assert_matches!(
3246            generic_stream.recv().await,
3247            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3248                assert_eq!(room_id, expected_room_id);
3249            }
3250        );
3251        assert!(generic_stream.is_empty());
3252
3253        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3254
3255        // The initial items contain one event because only the last chunk is loaded by
3256        // default.
3257        assert_eq!(items.len(), 1);
3258        assert_eq!(items[0].event_id().unwrap(), event_id2);
3259        assert!(stream.is_empty());
3260
3261        // The event cache knows only all events though, even if they aren't loaded.
3262        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3263        assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3264
3265        // Let's paginate to load more events.
3266        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3267
3268        assert_let_timeout!(
3269            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3270        );
3271        assert_eq!(diffs.len(), 1);
3272        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3273            assert_eq!(event.event_id().unwrap(), event_id1);
3274        });
3275
3276        assert!(stream.is_empty());
3277
3278        // A generic update is triggered too.
3279        assert_matches!(
3280            generic_stream.recv().await,
3281            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3282                assert_eq!(expected_room_id, room_id);
3283            }
3284        );
3285        assert!(generic_stream.is_empty());
3286
3287        // A new update with one of these events leads to deduplication.
3288        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
3289
3290        room_event_cache
3291            .inner
3292            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
3293            .await
3294            .unwrap();
3295
3296        // Just checking the generic update is correct. There is a duplicate event, so
3297        // no generic changes whatsoever!
3298        assert!(generic_stream.recv().now_or_never().is_none());
3299
3300        // The stream doesn't report these changes *yet*. Use the items vector given
3301        // when subscribing, to check that the items correspond to their new
3302        // positions. The duplicated item is removed (so it's not the first
3303        // element anymore), and it's added to the back of the list.
3304        let items = room_event_cache.events().await.unwrap();
3305        assert_eq!(items.len(), 2);
3306        assert_eq!(items[0].event_id().unwrap(), event_id1);
3307        assert_eq!(items[1].event_id().unwrap(), event_id2);
3308    }
3309
3310    #[async_test]
3311    async fn test_load_from_storage_resilient_to_failure() {
3312        let room_id = room_id!("!fondue:patate.ch");
3313        let event_cache_store = Arc::new(MemoryStore::new());
3314
3315        let event = EventFactory::new()
3316            .room(room_id)
3317            .sender(user_id!("@ben:saucisse.bzh"))
3318            .text_msg("foo")
3319            .event_id(event_id!("$42"))
3320            .into_event();
3321
3322        // Prefill the store with invalid data: two chunks that form a cycle.
3323        event_cache_store
3324            .handle_linked_chunk_updates(
3325                LinkedChunkId::Room(room_id),
3326                vec![
3327                    Update::NewItemsChunk {
3328                        previous: None,
3329                        new: ChunkIdentifier::new(0),
3330                        next: None,
3331                    },
3332                    Update::PushItems {
3333                        at: Position::new(ChunkIdentifier::new(0), 0),
3334                        items: vec![event],
3335                    },
3336                    Update::NewItemsChunk {
3337                        previous: Some(ChunkIdentifier::new(0)),
3338                        new: ChunkIdentifier::new(1),
3339                        next: Some(ChunkIdentifier::new(0)),
3340                    },
3341                ],
3342            )
3343            .await
3344            .unwrap();
3345
3346        let client = MockClientBuilder::new(None)
3347            .on_builder(|builder| {
3348                builder.store_config(
3349                    StoreConfig::new("holder".to_owned())
3350                        .event_cache_store(event_cache_store.clone()),
3351                )
3352            })
3353            .build()
3354            .await;
3355
3356        let event_cache = client.event_cache();
3357
3358        // Don't forget to subscribe and like.
3359        event_cache.subscribe().unwrap();
3360
3361        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3362        let room = client.get_room(room_id).unwrap();
3363
3364        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3365
3366        let items = room_event_cache.events().await.unwrap();
3367
3368        // Because the persisted content was invalid, the room store is reset: there are
3369        // no events in the cache.
3370        assert!(items.is_empty());
3371
3372        // Storage doesn't contain anything. It would also be valid that it contains a
3373        // single initial empty items chunk.
3374        let raw_chunks =
3375            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
3376        assert!(raw_chunks.is_empty());
3377    }
3378
3379    #[async_test]
3380    async fn test_no_useless_gaps() {
3381        let room_id = room_id!("!galette:saucisse.bzh");
3382
3383        let client = MockClientBuilder::new(None).build().await;
3384
3385        let event_cache = client.event_cache();
3386        event_cache.subscribe().unwrap();
3387
3388        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3389        let room = client.get_room(room_id).unwrap();
3390        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3391        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3392
3393        let f = EventFactory::new().room(room_id).sender(*ALICE);
3394
3395        // Propagate an update including a limited timeline with one message and a
3396        // prev-batch token.
3397        room_event_cache
3398            .inner
3399            .handle_joined_room_update(JoinedRoomUpdate {
3400                timeline: Timeline {
3401                    limited: true,
3402                    prev_batch: Some("raclette".to_owned()),
3403                    events: vec![f.text_msg("hey yo").into_event()],
3404                },
3405                ..Default::default()
3406            })
3407            .await
3408            .unwrap();
3409
3410        // Just checking the generic update is correct.
3411        assert_matches!(
3412            generic_stream.recv().await,
3413            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3414                assert_eq!(expected_room_id, room_id);
3415            }
3416        );
3417        assert!(generic_stream.is_empty());
3418
3419        {
3420            let mut state = room_event_cache.inner.state.write().await.unwrap();
3421
3422            let mut num_gaps = 0;
3423            let mut num_events = 0;
3424
3425            for c in state.room_linked_chunk().chunks() {
3426                match c.content() {
3427                    ChunkContent::Items(items) => num_events += items.len(),
3428                    ChunkContent::Gap(_) => num_gaps += 1,
3429                }
3430            }
3431
3432            // The limited sync unloads the chunk, so it will appear as if there are only
3433            // the events.
3434            assert_eq!(num_gaps, 0);
3435            assert_eq!(num_events, 1);
3436
3437            // But if I manually reload more of the chunk, the gap will be present.
3438            assert_matches!(
3439                state.load_more_events_backwards().await.unwrap(),
3440                LoadMoreEventsBackwardsOutcome::Gap { .. }
3441            );
3442
3443            num_gaps = 0;
3444            num_events = 0;
3445            for c in state.room_linked_chunk().chunks() {
3446                match c.content() {
3447                    ChunkContent::Items(items) => num_events += items.len(),
3448                    ChunkContent::Gap(_) => num_gaps += 1,
3449                }
3450            }
3451
3452            // The gap must have been stored.
3453            assert_eq!(num_gaps, 1);
3454            assert_eq!(num_events, 1);
3455        }
3456
3457        // Now, propagate an update for another message, but the timeline isn't limited
3458        // this time.
3459        room_event_cache
3460            .inner
3461            .handle_joined_room_update(JoinedRoomUpdate {
3462                timeline: Timeline {
3463                    limited: false,
3464                    prev_batch: Some("fondue".to_owned()),
3465                    events: vec![f.text_msg("sup").into_event()],
3466                },
3467                ..Default::default()
3468            })
3469            .await
3470            .unwrap();
3471
3472        // Just checking the generic update is correct.
3473        assert_matches!(
3474            generic_stream.recv().await,
3475            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3476                assert_eq!(expected_room_id, room_id);
3477            }
3478        );
3479        assert!(generic_stream.is_empty());
3480
3481        {
3482            let state = room_event_cache.inner.state.read().await.unwrap();
3483
3484            let mut num_gaps = 0;
3485            let mut num_events = 0;
3486
3487            for c in state.room_linked_chunk().chunks() {
3488                match c.content() {
3489                    ChunkContent::Items(items) => num_events += items.len(),
3490                    ChunkContent::Gap(gap) => {
3491                        assert_eq!(gap.prev_token, "raclette");
3492                        num_gaps += 1;
3493                    }
3494                }
3495            }
3496
3497            // There's only the previous gap, no new ones.
3498            assert_eq!(num_gaps, 1);
3499            assert_eq!(num_events, 2);
3500        }
3501    }
3502
3503    #[async_test]
3504    async fn test_shrink_to_last_chunk() {
3505        let room_id = room_id!("!galette:saucisse.bzh");
3506
3507        let client = MockClientBuilder::new(None).build().await;
3508
3509        let f = EventFactory::new().room(room_id);
3510
3511        let evid1 = event_id!("$1");
3512        let evid2 = event_id!("$2");
3513
3514        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3515        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3516
3517        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3518        {
3519            client
3520                .event_cache_store()
3521                .lock()
3522                .await
3523                .expect("Could not acquire the event cache lock")
3524                .as_clean()
3525                .expect("Could not acquire a clean event cache lock")
3526                .handle_linked_chunk_updates(
3527                    LinkedChunkId::Room(room_id),
3528                    vec![
3529                        Update::NewItemsChunk {
3530                            previous: None,
3531                            new: ChunkIdentifier::new(0),
3532                            next: None,
3533                        },
3534                        Update::PushItems {
3535                            at: Position::new(ChunkIdentifier::new(0), 0),
3536                            items: vec![ev1],
3537                        },
3538                        Update::NewItemsChunk {
3539                            previous: Some(ChunkIdentifier::new(0)),
3540                            new: ChunkIdentifier::new(1),
3541                            next: None,
3542                        },
3543                        Update::PushItems {
3544                            at: Position::new(ChunkIdentifier::new(1), 0),
3545                            items: vec![ev2],
3546                        },
3547                    ],
3548                )
3549                .await
3550                .unwrap();
3551        }
3552
3553        let event_cache = client.event_cache();
3554        event_cache.subscribe().unwrap();
3555
3556        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3557        let room = client.get_room(room_id).unwrap();
3558        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3559
3560        // Sanity check: lazily loaded, so only includes one item at start.
3561        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
3562        assert_eq!(events.len(), 1);
3563        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3564        assert!(stream.is_empty());
3565
3566        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3567
3568        // Force loading the full linked chunk by back-paginating.
3569        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3570        assert_eq!(outcome.events.len(), 1);
3571        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3572        assert!(outcome.reached_start);
3573
3574        // We also get an update about the loading from the store.
3575        assert_let_timeout!(
3576            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3577        );
3578        assert_eq!(diffs.len(), 1);
3579        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3580            assert_eq!(value.event_id().as_deref(), Some(evid1));
3581        });
3582
3583        assert!(stream.is_empty());
3584
3585        // Same for the generic update.
3586        assert_let_timeout!(
3587            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
3588        );
3589        assert_eq!(expected_room_id, room_id);
3590        assert!(generic_stream.is_empty());
3591
3592        // Shrink the linked chunk to the last chunk.
3593        let diffs = room_event_cache
3594            .inner
3595            .state
3596            .write()
3597            .await
3598            .unwrap()
3599            .force_shrink_to_last_chunk()
3600            .await
3601            .expect("shrinking should succeed");
3602
3603        // We receive updates about the changes to the linked chunk.
3604        assert_eq!(diffs.len(), 2);
3605        assert_matches!(&diffs[0], VectorDiff::Clear);
3606        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3607            assert_eq!(values.len(), 1);
3608            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3609        });
3610
3611        assert!(stream.is_empty());
3612
3613        // No generic update is sent in this case.
3614        assert!(generic_stream.is_empty());
3615
3616        // When reading the events, we do get only the last one.
3617        let events = room_event_cache.events().await.unwrap();
3618        assert_eq!(events.len(), 1);
3619        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3620
3621        // But if we back-paginate, we don't need access to network to find out about
3622        // the previous event.
3623        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3624        assert_eq!(outcome.events.len(), 1);
3625        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3626        assert!(outcome.reached_start);
3627    }
3628
3629    #[async_test]
3630    async fn test_room_ordering() {
3631        let room_id = room_id!("!galette:saucisse.bzh");
3632
3633        let client = MockClientBuilder::new(None).build().await;
3634
3635        let f = EventFactory::new().room(room_id).sender(*ALICE);
3636
3637        let evid1 = event_id!("$1");
3638        let evid2 = event_id!("$2");
3639        let evid3 = event_id!("$3");
3640
3641        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3642        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3643        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3644
3645        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3646        {
3647            client
3648                .event_cache_store()
3649                .lock()
3650                .await
3651                .expect("Could not acquire the event cache lock")
3652                .as_clean()
3653                .expect("Could not acquire a clean event cache lock")
3654                .handle_linked_chunk_updates(
3655                    LinkedChunkId::Room(room_id),
3656                    vec![
3657                        Update::NewItemsChunk {
3658                            previous: None,
3659                            new: ChunkIdentifier::new(0),
3660                            next: None,
3661                        },
3662                        Update::PushItems {
3663                            at: Position::new(ChunkIdentifier::new(0), 0),
3664                            items: vec![ev1, ev2],
3665                        },
3666                        Update::NewItemsChunk {
3667                            previous: Some(ChunkIdentifier::new(0)),
3668                            new: ChunkIdentifier::new(1),
3669                            next: None,
3670                        },
3671                        Update::PushItems {
3672                            at: Position::new(ChunkIdentifier::new(1), 0),
3673                            items: vec![ev3.clone()],
3674                        },
3675                    ],
3676                )
3677                .await
3678                .unwrap();
3679        }
3680
3681        let event_cache = client.event_cache();
3682        event_cache.subscribe().unwrap();
3683
3684        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3685        let room = client.get_room(room_id).unwrap();
3686        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3687
3688        // Initially, the linked chunk only contains the last chunk, so only ev3 is
3689        // loaded.
3690        {
3691            let state = room_event_cache.inner.state.read().await.unwrap();
3692            let room_linked_chunk = state.room_linked_chunk();
3693
3694            // But we can get the order of ev1.
3695            assert_eq!(
3696                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3697                Some(0)
3698            );
3699
3700            // And that of ev2 as well.
3701            assert_eq!(
3702                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3703                Some(1)
3704            );
3705
3706            // ev3, which is loaded, also has a known ordering.
3707            let mut events = room_linked_chunk.events();
3708            let (pos, ev) = events.next().unwrap();
3709            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3710            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3711            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3712
3713            // No other loaded events.
3714            assert!(events.next().is_none());
3715        }
3716
3717        // Force loading the full linked chunk by back-paginating.
3718        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3719        assert!(outcome.reached_start);
3720
3721        // All events are now loaded, so their order is precisely their enumerated index
3722        // in a linear iteration.
3723        {
3724            let state = room_event_cache.inner.state.read().await.unwrap();
3725            let room_linked_chunk = state.room_linked_chunk();
3726
3727            for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
3728                assert_eq!(room_linked_chunk.event_order(pos), Some(i));
3729            }
3730        }
3731
3732        // Handle a gappy sync with two events (including one duplicate, so
3733        // deduplication kicks in), so that the linked chunk is shrunk to the
3734        // last chunk, and that the linked chunk only contains the last two
3735        // events.
3736        let evid4 = event_id!("$4");
3737        room_event_cache
3738            .inner
3739            .handle_joined_room_update(JoinedRoomUpdate {
3740                timeline: Timeline {
3741                    limited: true,
3742                    prev_batch: Some("fondue".to_owned()),
3743                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3744                },
3745                ..Default::default()
3746            })
3747            .await
3748            .unwrap();
3749
3750        {
3751            let state = room_event_cache.inner.state.read().await.unwrap();
3752            let room_linked_chunk = state.room_linked_chunk();
3753
3754            // After the shrink, only evid3 and evid4 are loaded.
3755            let mut events = room_linked_chunk.events();
3756
3757            let (pos, ev) = events.next().unwrap();
3758            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3759            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3760
3761            let (pos, ev) = events.next().unwrap();
3762            assert_eq!(ev.event_id().as_deref(), Some(evid4));
3763            assert_eq!(room_linked_chunk.event_order(pos), Some(3));
3764
3765            // No other loaded events.
3766            assert!(events.next().is_none());
3767
3768            // But we can still get the order of previous events.
3769            assert_eq!(
3770                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3771                Some(0)
3772            );
3773            assert_eq!(
3774                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3775                Some(1)
3776            );
3777
3778            // ev3 doesn't have an order with its previous position, since it's been
3779            // deduplicated.
3780            assert_eq!(
3781                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
3782                None
3783            );
3784        }
3785    }
3786
3787    #[async_test]
3788    async fn test_auto_shrink_after_all_subscribers_are_gone() {
3789        let room_id = room_id!("!galette:saucisse.bzh");
3790
3791        let client = MockClientBuilder::new(None).build().await;
3792
3793        let f = EventFactory::new().room(room_id);
3794
3795        let evid1 = event_id!("$1");
3796        let evid2 = event_id!("$2");
3797
3798        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3799        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3800
3801        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3802        {
3803            client
3804                .event_cache_store()
3805                .lock()
3806                .await
3807                .expect("Could not acquire the event cache lock")
3808                .as_clean()
3809                .expect("Could not acquire a clean event cache lock")
3810                .handle_linked_chunk_updates(
3811                    LinkedChunkId::Room(room_id),
3812                    vec![
3813                        Update::NewItemsChunk {
3814                            previous: None,
3815                            new: ChunkIdentifier::new(0),
3816                            next: None,
3817                        },
3818                        Update::PushItems {
3819                            at: Position::new(ChunkIdentifier::new(0), 0),
3820                            items: vec![ev1],
3821                        },
3822                        Update::NewItemsChunk {
3823                            previous: Some(ChunkIdentifier::new(0)),
3824                            new: ChunkIdentifier::new(1),
3825                            next: None,
3826                        },
3827                        Update::PushItems {
3828                            at: Position::new(ChunkIdentifier::new(1), 0),
3829                            items: vec![ev2],
3830                        },
3831                    ],
3832                )
3833                .await
3834                .unwrap();
3835        }
3836
3837        let event_cache = client.event_cache();
3838        event_cache.subscribe().unwrap();
3839
3840        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3841        let room = client.get_room(room_id).unwrap();
3842        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3843
3844        // Sanity check: lazily loaded, so only includes one item at start.
3845        let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
3846        assert_eq!(events1.len(), 1);
3847        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3848        assert!(stream1.is_empty());
3849
3850        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3851
3852        // Force loading the full linked chunk by back-paginating.
3853        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3854        assert_eq!(outcome.events.len(), 1);
3855        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3856        assert!(outcome.reached_start);
3857
3858        // We also get an update about the loading from the store. Ignore it, for this
3859        // test's sake.
3860        assert_let_timeout!(
3861            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3862        );
3863        assert_eq!(diffs.len(), 1);
3864        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3865            assert_eq!(value.event_id().as_deref(), Some(evid1));
3866        });
3867
3868        assert!(stream1.is_empty());
3869
3870        assert_let_timeout!(
3871            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
3872        );
3873        assert_eq!(expected_room_id, room_id);
3874        assert!(generic_stream.is_empty());
3875
3876        // Have another subscriber.
3877        // Since it's not the first one, and the previous one loaded some more events,
3878        // the second subscribers sees them all.
3879        let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
3880        assert_eq!(events2.len(), 2);
3881        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3882        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3883        assert!(stream2.is_empty());
3884
3885        // Drop the first stream, and wait a bit.
3886        drop(stream1);
3887        yield_now().await;
3888
3889        // The second stream remains undisturbed.
3890        assert!(stream2.is_empty());
3891
3892        // Now drop the second stream, and wait a bit.
3893        drop(stream2);
3894        yield_now().await;
3895
3896        // The linked chunk must have auto-shrunk by now.
3897
3898        {
3899            // Check the inner state: there's no more shared auto-shrinker.
3900            let state = room_event_cache.inner.state.read().await.unwrap();
3901            assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
3902        }
3903
3904        // Getting the events will only give us the latest chunk.
3905        let events3 = room_event_cache.events().await.unwrap();
3906        assert_eq!(events3.len(), 1);
3907        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3908    }
3909
3910    #[async_test]
3911    async fn test_rfind_map_event_in_memory_by() {
3912        let user_id = user_id!("@mnt_io:matrix.org");
3913        let room_id = room_id!("!raclette:patate.ch");
3914        let client = MockClientBuilder::new(None).build().await;
3915
3916        let event_factory = EventFactory::new().room(room_id);
3917
3918        let event_id_0 = event_id!("$ev0");
3919        let event_id_1 = event_id!("$ev1");
3920        let event_id_2 = event_id!("$ev2");
3921        let event_id_3 = event_id!("$ev3");
3922
3923        let event_0 =
3924            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3925        let event_1 =
3926            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3927        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3928        let event_3 =
3929            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3930
3931        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
3932        // events.
3933        {
3934            client
3935                .event_cache_store()
3936                .lock()
3937                .await
3938                .expect("Could not acquire the event cache lock")
3939                .as_clean()
3940                .expect("Could not acquire a clean event cache lock")
3941                .handle_linked_chunk_updates(
3942                    LinkedChunkId::Room(room_id),
3943                    vec![
3944                        Update::NewItemsChunk {
3945                            previous: None,
3946                            new: ChunkIdentifier::new(0),
3947                            next: None,
3948                        },
3949                        Update::PushItems {
3950                            at: Position::new(ChunkIdentifier::new(0), 0),
3951                            items: vec![event_3],
3952                        },
3953                        Update::NewItemsChunk {
3954                            previous: Some(ChunkIdentifier::new(0)),
3955                            new: ChunkIdentifier::new(1),
3956                            next: None,
3957                        },
3958                        Update::PushItems {
3959                            at: Position::new(ChunkIdentifier::new(1), 0),
3960                            items: vec![event_0, event_1, event_2],
3961                        },
3962                    ],
3963                )
3964                .await
3965                .unwrap();
3966        }
3967
3968        let event_cache = client.event_cache();
3969        event_cache.subscribe().unwrap();
3970
3971        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3972        let room = client.get_room(room_id).unwrap();
3973        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3974
3975        // Look for an event from `BOB`: it must be `event_0`.
3976        assert_matches!(
3977            room_event_cache
3978                .rfind_map_event_in_memory_by(|event, previous_event| {
3979                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| (event.event_id(), previous_event.and_then(|event| event.event_id())))
3980                })
3981                .await,
3982            Ok(Some((event_id, previous_event_id))) => {
3983                assert_eq!(event_id.as_deref(), Some(event_id_0));
3984                assert!(previous_event_id.is_none());
3985            }
3986        );
3987
3988        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
3989        // because events are looked for in reverse order.
3990        assert_matches!(
3991            room_event_cache
3992                .rfind_map_event_in_memory_by(|event, previous_event| {
3993                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| (event.event_id(), previous_event.and_then(|event| event.event_id())))
3994                })
3995                .await,
3996            Ok(Some((event_id, previous_event_id))) => {
3997                assert_eq!(event_id.as_deref(), Some(event_id_2));
3998                assert_eq!(previous_event_id.as_deref(), Some(event_id_1));
3999            }
4000        );
4001
4002        // Look for an event that is inside the storage, but not loaded.
4003        assert!(
4004            room_event_cache
4005                .rfind_map_event_in_memory_by(|event, _| {
4006                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
4007                        == Some(user_id))
4008                    .then(|| event.event_id())
4009                })
4010                .await
4011                .unwrap()
4012                .is_none()
4013        );
4014
4015        // Look for an event that doesn't exist.
4016        assert!(
4017            room_event_cache
4018                .rfind_map_event_in_memory_by(|_, _| None::<()>)
4019                .await
4020                .unwrap()
4021                .is_none()
4022        );
4023    }
4024
4025    #[async_test]
4026    async fn test_reload_when_dirty() {
4027        let user_id = user_id!("@mnt_io:matrix.org");
4028        let room_id = room_id!("!raclette:patate.ch");
4029
4030        // The storage shared by the two clients.
4031        let event_cache_store = MemoryStore::new();
4032
4033        // Client for the process 0.
4034        let client_p0 = MockClientBuilder::new(None)
4035            .on_builder(|builder| {
4036                builder.store_config(
4037                    StoreConfig::new("process #0".to_owned())
4038                        .event_cache_store(event_cache_store.clone()),
4039                )
4040            })
4041            .build()
4042            .await;
4043
4044        // Client for the process 1.
4045        let client_p1 = MockClientBuilder::new(None)
4046            .on_builder(|builder| {
4047                builder.store_config(
4048                    StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4049                )
4050            })
4051            .build()
4052            .await;
4053
4054        let event_factory = EventFactory::new().room(room_id).sender(user_id);
4055
4056        let ev_id_0 = event_id!("$ev_0");
4057        let ev_id_1 = event_id!("$ev_1");
4058
4059        let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
4060        let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
4061
4062        // Add events to the storage (shared by the two clients!).
4063        client_p0
4064            .event_cache_store()
4065            .lock()
4066            .await
4067            .expect("[p0] Could not acquire the event cache lock")
4068            .as_clean()
4069            .expect("[p0] Could not acquire a clean event cache lock")
4070            .handle_linked_chunk_updates(
4071                LinkedChunkId::Room(room_id),
4072                vec![
4073                    Update::NewItemsChunk {
4074                        previous: None,
4075                        new: ChunkIdentifier::new(0),
4076                        next: None,
4077                    },
4078                    Update::PushItems {
4079                        at: Position::new(ChunkIdentifier::new(0), 0),
4080                        items: vec![ev_0],
4081                    },
4082                    Update::NewItemsChunk {
4083                        previous: Some(ChunkIdentifier::new(0)),
4084                        new: ChunkIdentifier::new(1),
4085                        next: None,
4086                    },
4087                    Update::PushItems {
4088                        at: Position::new(ChunkIdentifier::new(1), 0),
4089                        items: vec![ev_1],
4090                    },
4091                ],
4092            )
4093            .await
4094            .unwrap();
4095
4096        // Subscribe the event caches, and create the room.
4097        let (room_event_cache_p0, room_event_cache_p1) = {
4098            let event_cache_p0 = client_p0.event_cache();
4099            event_cache_p0.subscribe().unwrap();
4100
4101            let event_cache_p1 = client_p1.event_cache();
4102            event_cache_p1.subscribe().unwrap();
4103
4104            client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
4105            client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
4106
4107            let (room_event_cache_p0, _drop_handles) =
4108                client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
4109            let (room_event_cache_p1, _drop_handles) =
4110                client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
4111
4112            (room_event_cache_p0, room_event_cache_p1)
4113        };
4114
4115        // Okay. We are ready for the test!
4116        //
4117        // First off, let's check `room_event_cache_p0` has access to the first event
4118        // loaded in-memory, then do a pagination, and see more events.
4119        let mut updates_stream_p0 = {
4120            let room_event_cache = &room_event_cache_p0;
4121
4122            let (initial_updates, mut updates_stream) =
4123                room_event_cache_p0.subscribe().await.unwrap();
4124
4125            // Initial updates contain `ev_id_1` only.
4126            assert_eq!(initial_updates.len(), 1);
4127            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4128            assert!(updates_stream.is_empty());
4129
4130            // `ev_id_1` must be loaded in memory.
4131            assert!(event_loaded(room_event_cache, ev_id_1).await);
4132
4133            // `ev_id_0` must NOT be loaded in memory.
4134            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4135
4136            // Load one more event with a backpagination.
4137            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4138
4139            // A new update for `ev_id_0` must be present.
4140            assert_matches!(
4141                updates_stream.recv().await.unwrap(),
4142                RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4143                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
4144                    assert_matches!(
4145                        &diffs[0],
4146                        VectorDiff::Insert { index: 0, value: event } => {
4147                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4148                        }
4149                    );
4150                }
4151            );
4152
4153            // `ev_id_0` must now be loaded in memory.
4154            assert!(event_loaded(room_event_cache, ev_id_0).await);
4155
4156            updates_stream
4157        };
4158
4159        // Second, let's check `room_event_cache_p1` has the same accesses.
4160        let mut updates_stream_p1 = {
4161            let room_event_cache = &room_event_cache_p1;
4162            let (initial_updates, mut updates_stream) =
4163                room_event_cache_p1.subscribe().await.unwrap();
4164
4165            // Initial updates contain `ev_id_1` only.
4166            assert_eq!(initial_updates.len(), 1);
4167            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4168            assert!(updates_stream.is_empty());
4169
4170            // `ev_id_1` must be loaded in memory.
4171            assert!(event_loaded(room_event_cache, ev_id_1).await);
4172
4173            // `ev_id_0` must NOT be loaded in memory.
4174            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4175
4176            // Load one more event with a backpagination.
4177            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4178
4179            // A new update for `ev_id_0` must be present.
4180            assert_matches!(
4181                updates_stream.recv().await.unwrap(),
4182                RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4183                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
4184                    assert_matches!(
4185                        &diffs[0],
4186                        VectorDiff::Insert { index: 0, value: event } => {
4187                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4188                        }
4189                    );
4190                }
4191            );
4192
4193            // `ev_id_0` must now be loaded in memory.
4194            assert!(event_loaded(room_event_cache, ev_id_0).await);
4195
4196            updates_stream
4197        };
4198
4199        // Do this a couple times, for the fun.
4200        for _ in 0..3 {
4201            // Third, because `room_event_cache_p1` has locked the store, the lock
4202            // is dirty for `room_event_cache_p0`, so it will shrink to its last
4203            // chunk!
4204            {
4205                let room_event_cache = &room_event_cache_p0;
4206                let updates_stream = &mut updates_stream_p0;
4207
4208                // `ev_id_1` must be loaded in memory, just like before.
4209                assert!(event_loaded(room_event_cache, ev_id_1).await);
4210
4211                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
4212                // state has been reloaded to its last chunk.
4213                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4214
4215                // The reload can be observed via the updates too.
4216                assert_matches!(
4217                    updates_stream.recv().await.unwrap(),
4218                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4219                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4220                        assert_matches!(&diffs[0], VectorDiff::Clear);
4221                        assert_matches!(
4222                            &diffs[1],
4223                            VectorDiff::Append { values: events } => {
4224                                assert_eq!(events.len(), 1);
4225                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4226                            }
4227                        );
4228                    }
4229                );
4230
4231                // Load one more event with a backpagination.
4232                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4233
4234                // `ev_id_0` must now be loaded in memory.
4235                assert!(event_loaded(room_event_cache, ev_id_0).await);
4236
4237                // The pagination can be observed via the updates too.
4238                assert_matches!(
4239                    updates_stream.recv().await.unwrap(),
4240                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4241                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4242                        assert_matches!(
4243                            &diffs[0],
4244                            VectorDiff::Insert { index: 0, value: event } => {
4245                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4246                            }
4247                        );
4248                    }
4249                );
4250            }
4251
4252            // Fourth, because `room_event_cache_p0` has locked the store again, the lock
4253            // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
4254            // chunk!
4255            {
4256                let room_event_cache = &room_event_cache_p1;
4257                let updates_stream = &mut updates_stream_p1;
4258
4259                // `ev_id_1` must be loaded in memory, just like before.
4260                assert!(event_loaded(room_event_cache, ev_id_1).await);
4261
4262                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
4263                // state has shrunk to its last chunk.
4264                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4265
4266                // The reload can be observed via the updates too.
4267                assert_matches!(
4268                    updates_stream.recv().await.unwrap(),
4269                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4270                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4271                        assert_matches!(&diffs[0], VectorDiff::Clear);
4272                        assert_matches!(
4273                            &diffs[1],
4274                            VectorDiff::Append { values: events } => {
4275                                assert_eq!(events.len(), 1);
4276                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4277                            }
4278                        );
4279                    }
4280                );
4281
4282                // Load one more event with a backpagination.
4283                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4284
4285                // `ev_id_0` must now be loaded in memory.
4286                assert!(event_loaded(room_event_cache, ev_id_0).await);
4287
4288                // The pagination can be observed via the updates too.
4289                assert_matches!(
4290                    updates_stream.recv().await.unwrap(),
4291                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4292                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4293                        assert_matches!(
4294                            &diffs[0],
4295                            VectorDiff::Insert { index: 0, value: event } => {
4296                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4297                            }
4298                        );
4299                    }
4300                );
4301            }
4302        }
4303
4304        // Repeat that with an explicit read lock (so that we don't rely on
4305        // `event_loaded` to trigger the dirty detection).
4306        for _ in 0..3 {
4307            {
4308                let room_event_cache = &room_event_cache_p0;
4309                let updates_stream = &mut updates_stream_p0;
4310
4311                let guard = room_event_cache.inner.state.read().await.unwrap();
4312
4313                // Guard is kept alive, to ensure we can have multiple read guards alive with a
4314                // shared access.
4315                // See `RoomEventCacheStateLock::read` to learn more.
4316
4317                // The lock is no longer marked as dirty, it's been cleaned.
4318                assert!(guard.is_dirty().not());
4319
4320                // The reload can be observed via the updates too.
4321                assert_matches!(
4322                    updates_stream.recv().await.unwrap(),
4323                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4324                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4325                        assert_matches!(&diffs[0], VectorDiff::Clear);
4326                        assert_matches!(
4327                            &diffs[1],
4328                            VectorDiff::Append { values: events } => {
4329                                assert_eq!(events.len(), 1);
4330                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4331                            }
4332                        );
4333                    }
4334                );
4335
4336                assert!(event_loaded(room_event_cache, ev_id_1).await);
4337                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4338
4339                // Ensure `guard` is alive up to this point (in case this test is refactored, I
4340                // want to make this super explicit).
4341                //
4342                // We drop need to drop it before the pagination because the pagination needs to
4343                // obtain a write lock.
4344                drop(guard);
4345
4346                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4347                assert!(event_loaded(room_event_cache, ev_id_0).await);
4348
4349                // The pagination can be observed via the updates too.
4350                assert_matches!(
4351                    updates_stream.recv().await.unwrap(),
4352                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4353                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4354                        assert_matches!(
4355                            &diffs[0],
4356                            VectorDiff::Insert { index: 0, value: event } => {
4357                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4358                            }
4359                        );
4360                    }
4361                );
4362            }
4363
4364            {
4365                let room_event_cache = &room_event_cache_p1;
4366                let updates_stream = &mut updates_stream_p1;
4367
4368                let guard = room_event_cache.inner.state.read().await.unwrap();
4369
4370                // Guard is kept alive, to ensure we can have multiple read guards alive with a
4371                // shared access.
4372
4373                // The lock is no longer marked as dirty, it's been cleaned.
4374                assert!(guard.is_dirty().not());
4375
4376                // The reload can be observed via the updates too.
4377                assert_matches!(
4378                    updates_stream.recv().await.unwrap(),
4379                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4380                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4381                        assert_matches!(&diffs[0], VectorDiff::Clear);
4382                        assert_matches!(
4383                            &diffs[1],
4384                            VectorDiff::Append { values: events } => {
4385                                assert_eq!(events.len(), 1);
4386                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4387                            }
4388                        );
4389                    }
4390                );
4391
4392                assert!(event_loaded(room_event_cache, ev_id_1).await);
4393                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4394
4395                // Ensure `guard` is alive up to this point (in case this test is refactored, I
4396                // want to make this super explicit).
4397                //
4398                // We drop need to drop it before the pagination because the pagination needs to
4399                // obtain a write lock.
4400                drop(guard);
4401
4402                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4403                assert!(event_loaded(room_event_cache, ev_id_0).await);
4404
4405                // The pagination can be observed via the updates too.
4406                assert_matches!(
4407                    updates_stream.recv().await.unwrap(),
4408                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4409                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4410                        assert_matches!(
4411                            &diffs[0],
4412                            VectorDiff::Insert { index: 0, value: event } => {
4413                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4414                            }
4415                        );
4416                    }
4417                );
4418            }
4419        }
4420
4421        // Repeat that with an explicit write lock.
4422        for _ in 0..3 {
4423            {
4424                let room_event_cache = &room_event_cache_p0;
4425                let updates_stream = &mut updates_stream_p0;
4426
4427                let guard = room_event_cache.inner.state.write().await.unwrap();
4428
4429                // The lock is no longer marked as dirty, it's been cleaned.
4430                assert!(guard.is_dirty().not());
4431
4432                // The reload can be observed via the updates too.
4433                assert_matches!(
4434                    updates_stream.recv().await.unwrap(),
4435                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4436                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4437                        assert_matches!(&diffs[0], VectorDiff::Clear);
4438                        assert_matches!(
4439                            &diffs[1],
4440                            VectorDiff::Append { values: events } => {
4441                                assert_eq!(events.len(), 1);
4442                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4443                            }
4444                        );
4445                    }
4446                );
4447
4448                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
4449                // needs to obtain a read lock.
4450                drop(guard);
4451
4452                assert!(event_loaded(room_event_cache, ev_id_1).await);
4453                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4454
4455                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4456                assert!(event_loaded(room_event_cache, ev_id_0).await);
4457
4458                // The pagination can be observed via the updates too.
4459                assert_matches!(
4460                    updates_stream.recv().await.unwrap(),
4461                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4462                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4463                        assert_matches!(
4464                            &diffs[0],
4465                            VectorDiff::Insert { index: 0, value: event } => {
4466                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4467                            }
4468                        );
4469                    }
4470                );
4471            }
4472
4473            {
4474                let room_event_cache = &room_event_cache_p1;
4475                let updates_stream = &mut updates_stream_p1;
4476
4477                let guard = room_event_cache.inner.state.write().await.unwrap();
4478
4479                // The lock is no longer marked as dirty, it's been cleaned.
4480                assert!(guard.is_dirty().not());
4481
4482                // The reload can be observed via the updates too.
4483                assert_matches!(
4484                    updates_stream.recv().await.unwrap(),
4485                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4486                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4487                        assert_matches!(&diffs[0], VectorDiff::Clear);
4488                        assert_matches!(
4489                            &diffs[1],
4490                            VectorDiff::Append { values: events } => {
4491                                assert_eq!(events.len(), 1);
4492                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4493                            }
4494                        );
4495                    }
4496                );
4497
4498                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
4499                // needs to obtain a read lock.
4500                drop(guard);
4501
4502                assert!(event_loaded(room_event_cache, ev_id_1).await);
4503                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4504
4505                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4506                assert!(event_loaded(room_event_cache, ev_id_0).await);
4507
4508                // The pagination can be observed via the updates too.
4509                assert_matches!(
4510                    updates_stream.recv().await.unwrap(),
4511                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4512                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4513                        assert_matches!(
4514                            &diffs[0],
4515                            VectorDiff::Insert { index: 0, value: event } => {
4516                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4517                            }
4518                        );
4519                    }
4520                );
4521            }
4522        }
4523    }
4524
4525    #[async_test]
4526    async fn test_load_when_dirty() {
4527        let room_id_0 = room_id!("!raclette:patate.ch");
4528        let room_id_1 = room_id!("!morbiflette:patate.ch");
4529
4530        // The storage shared by the two clients.
4531        let event_cache_store = MemoryStore::new();
4532
4533        // Client for the process 0.
4534        let client_p0 = MockClientBuilder::new(None)
4535            .on_builder(|builder| {
4536                builder.store_config(
4537                    StoreConfig::new("process #0".to_owned())
4538                        .event_cache_store(event_cache_store.clone()),
4539                )
4540            })
4541            .build()
4542            .await;
4543
4544        // Client for the process 1.
4545        let client_p1 = MockClientBuilder::new(None)
4546            .on_builder(|builder| {
4547                builder.store_config(
4548                    StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4549                )
4550            })
4551            .build()
4552            .await;
4553
4554        // Subscribe the event caches, and create the room.
4555        let (room_event_cache_0_p0, room_event_cache_0_p1) = {
4556            let event_cache_p0 = client_p0.event_cache();
4557            event_cache_p0.subscribe().unwrap();
4558
4559            let event_cache_p1 = client_p1.event_cache();
4560            event_cache_p1.subscribe().unwrap();
4561
4562            client_p0
4563                .base_client()
4564                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4565            client_p0
4566                .base_client()
4567                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4568
4569            client_p1
4570                .base_client()
4571                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4572            client_p1
4573                .base_client()
4574                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4575
4576            let (room_event_cache_0_p0, _drop_handles) =
4577                client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4578            let (room_event_cache_0_p1, _drop_handles) =
4579                client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4580
4581            (room_event_cache_0_p0, room_event_cache_0_p1)
4582        };
4583
4584        // Let's make the cross-process lock over the store dirty.
4585        {
4586            drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
4587            drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
4588        }
4589
4590        // Create the `RoomEventCache` for `room_id_1`. During its creation, the
4591        // cross-process lock over the store MUST be dirty, which makes no difference as
4592        // a clean one: the state is just loaded, not reloaded.
4593        let (room_event_cache_1_p0, _) =
4594            client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
4595
4596        // Check the lock isn't dirty because it's been cleared.
4597        {
4598            let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
4599            assert!(guard.is_dirty().not());
4600        }
4601
4602        // The only way to test this behaviour is to see that the dirty block in
4603        // `RoomEventCacheStateLock` is covered by this test.
4604    }
4605
4606    async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
4607        room_event_cache
4608            .rfind_map_event_in_memory_by(|event, _previous_event_id| {
4609                (event.event_id().as_deref() == Some(event_id)).then_some(())
4610            })
4611            .await
4612            .unwrap()
4613            .is_some()
4614    }
4615}