matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        Arc,
23        atomic::{AtomicUsize, Ordering},
24    },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::AmbiguityChange,
32    event_cache::Event,
33    linked_chunk::Position,
34    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37    EventId, OwnedEventId, OwnedRoomId,
38    api::Direction,
39    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40    serde::Raw,
41};
42use tokio::sync::{
43    Notify,
44    broadcast::{Receiver, Sender},
45    mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50    AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51    RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54    client::WeakClient,
55    event_cache::EventCacheError,
56    room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64/// A subset of an event cache, for a room.
65///
66/// Cloning is shallow, and thus is cheap to do.
67#[derive(Clone)]
68pub struct RoomEventCache {
69    pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_struct("RoomEventCache").finish_non_exhaustive()
75    }
76}
77
78/// Thin wrapper for a room event cache subscriber, so as to trigger
79/// side-effects when all subscribers are gone.
80///
81/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
82/// more subscribers are active. This is an optimisation to reduce the number of
83/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
84/// active, all data are reduced to the minimum.
85///
86/// The side-effect takes effect on `Drop`.
87#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89    /// Underlying receiver of the room event cache's updates.
90    recv: Receiver<RoomEventCacheUpdate>,
91
92    /// To which room are we listening?
93    room_id: OwnedRoomId,
94
95    /// Sender to the auto-shrink channel.
96    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98    /// Shared instance of the auto-shrinker.
99    subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103    fn drop(&mut self) {
104        let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106        trace!(
107            "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108        );
109
110        if previous_subscriber_count == 1 {
111            // We were the last instance of the subscriber; let the auto-shrinker know by
112            // notifying it of our room id.
113
114            let mut room_id = self.room_id.clone();
115
116            // Try to send without waiting for channel capacity, and restart in a spin-loop
117            // if it failed (until a maximum number of attempts is reached, or
118            // the send was successful). The channel shouldn't be super busy in
119            // general, so this should resolve quickly enough.
120
121            let mut num_attempts = 0;
122
123            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124                num_attempts += 1;
125
126                if num_attempts > 1024 {
127                    // If we've tried too many times, just give up with a warning; after all, this
128                    // is only an optimization.
129                    warn!(
130                        "couldn't send notification to the auto-shrink channel \
131                         after 1024 attempts; giving up"
132                    );
133                    return;
134                }
135
136                match err {
137                    mpsc::error::TrySendError::Full(stolen_room_id) => {
138                        room_id = stolen_room_id;
139                    }
140                    mpsc::error::TrySendError::Closed(_) => return,
141                }
142            }
143
144            trace!("sent notification to the parent channel that we were the last subscriber");
145        }
146    }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150    type Target = Receiver<RoomEventCacheUpdate>;
151
152    fn deref(&self) -> &Self::Target {
153        &self.recv
154    }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158    fn deref_mut(&mut self) -> &mut Self::Target {
159        &mut self.recv
160    }
161}
162
163impl RoomEventCache {
164    /// Create a new [`RoomEventCache`] using the given room and store.
165    pub(super) fn new(
166        client: WeakClient,
167        state: RoomEventCacheStateLock,
168        pagination_status: SharedObservable<RoomPaginationStatus>,
169        room_id: OwnedRoomId,
170        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171        update_sender: Sender<RoomEventCacheUpdate>,
172        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
173    ) -> Self {
174        Self {
175            inner: Arc::new(RoomEventCacheInner::new(
176                client,
177                state,
178                pagination_status,
179                room_id,
180                auto_shrink_sender,
181                update_sender,
182                generic_update_sender,
183            )),
184        }
185    }
186
187    /// Read all current events.
188    ///
189    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
190    /// subscriber.
191    pub async fn events(&self) -> Result<Vec<Event>> {
192        let state = self.inner.state.read().await?;
193
194        Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
195    }
196
197    /// Subscribe to this room updates, after getting the initial list of
198    /// events.
199    ///
200    /// Use [`RoomEventCache::events`] to get all current events without the
201    /// subscriber. Creating, and especially dropping, a
202    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
203    pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
204        let state = self.inner.state.read().await?;
205        let events =
206            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
207
208        let subscriber_count = state.subscriber_count();
209        let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
210        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
211
212        let recv = self.inner.update_sender.subscribe();
213        let subscriber = RoomEventCacheSubscriber {
214            recv,
215            room_id: self.inner.room_id.clone(),
216            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
217            subscriber_count: subscriber_count.clone(),
218        };
219
220        Ok((events, subscriber))
221    }
222
223    /// Subscribe to thread for a given root event, and get a (maybe empty)
224    /// initially known list of events for that thread.
225    pub async fn subscribe_to_thread(
226        &self,
227        thread_root: OwnedEventId,
228    ) -> Result<(Vec<Event>, Receiver<ThreadEventCacheUpdate>)> {
229        let mut state = self.inner.state.write().await?;
230        Ok(state.subscribe_to_thread(thread_root))
231    }
232
233    /// Paginate backwards in a thread, given its root event ID.
234    ///
235    /// Returns whether we've hit the start of the thread, in which case the
236    /// root event will be prepended to the thread.
237    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
238    pub async fn paginate_thread_backwards(
239        &self,
240        thread_root: OwnedEventId,
241        num_events: u16,
242    ) -> Result<bool> {
243        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
244
245        // Take the lock only for a short time here.
246        let mut outcome =
247            self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone());
248
249        loop {
250            match outcome {
251                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
252                    // Start a threaded pagination from this gap.
253                    let options = RelationsOptions {
254                        from: prev_token.clone(),
255                        dir: Direction::Backward,
256                        limit: Some(num_events.into()),
257                        include_relations: IncludeRelations::AllRelations,
258                        recurse: true,
259                    };
260
261                    let mut result = room
262                        .relations(thread_root.clone(), options)
263                        .await
264                        .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
265
266                    let reached_start = result.next_batch_token.is_none();
267                    trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
268
269                    // Because the state lock is taken again in `load_or_fetch_event`, we need
270                    // to do this *before* we take the state lock again.
271                    let root_event =
272                        if reached_start {
273                            // Prepend the thread root event to the results.
274                            Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
275                                |err| EventCacheError::BackpaginationError(Box::new(err)),
276                            )?)
277                        } else {
278                            None
279                        };
280
281                    let mut state = self.inner.state.write().await?;
282
283                    // Save all the events (but the thread root) in the store.
284                    state.save_events(result.chunk.iter().cloned()).await?;
285
286                    // Note: the events are still in the reversed order at this point, so
287                    // pushing will eventually make it so that the root event is the first.
288                    result.chunk.extend(root_event);
289
290                    if let Some(outcome) = state.finish_thread_network_pagination(
291                        thread_root.clone(),
292                        prev_token,
293                        result.next_batch_token,
294                        result.chunk,
295                    ) {
296                        return Ok(outcome.reached_start);
297                    }
298
299                    // fallthrough: restart the pagination.
300                    outcome = state.load_more_thread_events_backwards(thread_root.clone());
301                }
302
303                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
304                    // We're done!
305                    return Ok(true);
306                }
307
308                LoadMoreEventsBackwardsOutcome::Events { .. } => {
309                    // TODO: implement :)
310                    unimplemented!("loading from disk for threads is not implemented yet");
311                }
312            }
313        }
314    }
315
316    /// Return a [`RoomPagination`] API object useful for running
317    /// back-pagination queries in the current room.
318    pub fn pagination(&self) -> RoomPagination {
319        RoomPagination { inner: self.inner.clone() }
320    }
321
322    /// Try to find a single event in this room, starting from the most recent
323    /// event.
324    ///
325    /// **Warning**! It looks into the loaded events from the in-memory linked
326    /// chunk **only**. It doesn't look inside the storage.
327    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
328    where
329        P: FnMut(&Event) -> Option<O>,
330    {
331        Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
332    }
333
334    /// Try to find an event by ID in this room.
335    ///
336    /// It starts by looking into loaded events before looking inside the
337    /// storage.
338    pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
339        Ok(self
340            .inner
341            .state
342            .read()
343            .await?
344            .find_event(event_id)
345            .await
346            .ok()
347            .flatten()
348            .map(|(_loc, event)| event))
349    }
350
351    /// Try to find an event by ID in this room, along with its related events.
352    ///
353    /// You can filter which types of related events to retrieve using
354    /// `filter`. `None` will retrieve related events of any type.
355    ///
356    /// The related events are sorted like this:
357    ///
358    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
359    ///   located at the beginning of the array.
360    /// - events present in the linked chunk (be it in memory or in the storage)
361    ///   will be sorted according to their ordering in the linked chunk.
362    pub async fn find_event_with_relations(
363        &self,
364        event_id: &EventId,
365        filter: Option<Vec<RelationType>>,
366    ) -> Result<Option<(Event, Vec<Event>)>> {
367        // Search in all loaded or stored events.
368        Ok(self
369            .inner
370            .state
371            .read()
372            .await?
373            .find_event_with_relations(event_id, filter.clone())
374            .await
375            .ok()
376            .flatten())
377    }
378
379    /// Clear all the storage for this [`RoomEventCache`].
380    ///
381    /// This will get rid of all the events from the linked chunk and persisted
382    /// storage.
383    pub async fn clear(&self) -> Result<()> {
384        // Clear the linked chunk and persisted storage.
385        let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
386
387        // Notify observers about the update.
388        let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
389            diffs: updates_as_vector_diffs,
390            origin: EventsOrigin::Cache,
391        });
392
393        // Notify observers about the generic update.
394        let _ = self
395            .inner
396            .generic_update_sender
397            .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
398
399        Ok(())
400    }
401
402    /// Save some events in the event cache, for further retrieval with
403    /// [`Self::event`].
404    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
405        match self.inner.state.write().await {
406            Ok(mut state_guard) => {
407                if let Err(err) = state_guard.save_events(events).await {
408                    warn!("couldn't save event in the event cache: {err}");
409                }
410            }
411
412            Err(err) => {
413                warn!("couldn't save event in the event cache: {err}");
414            }
415        }
416    }
417
418    /// Return a nice debug string (a vector of lines) for the linked chunk of
419    /// events for this room.
420    pub async fn debug_string(&self) -> Vec<String> {
421        match self.inner.state.read().await {
422            Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
423            Err(err) => {
424                warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
425
426                vec![]
427            }
428        }
429    }
430}
431
432/// The (non-cloneable) details of the `RoomEventCache`.
433pub(super) struct RoomEventCacheInner {
434    /// The room id for this room.
435    pub(super) room_id: OwnedRoomId,
436
437    pub weak_room: WeakRoom,
438
439    /// State for this room's event cache.
440    pub state: RoomEventCacheStateLock,
441
442    /// A notifier that we received a new pagination token.
443    pub pagination_batch_token_notifier: Notify,
444
445    pub pagination_status: SharedObservable<RoomPaginationStatus>,
446
447    /// Sender to the auto-shrink channel.
448    ///
449    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
450    /// more details.
451    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
452
453    /// Sender part for update subscribers to this room.
454    pub update_sender: Sender<RoomEventCacheUpdate>,
455
456    /// A clone of [`EventCacheInner::generic_update_sender`].
457    ///
458    /// Whilst `EventCacheInner` handles the generic updates from the sync, or
459    /// the storage, it doesn't handle the update from pagination. Having a
460    /// clone here allows to access it from [`RoomPagination`].
461    pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
462}
463
464impl RoomEventCacheInner {
465    /// Creates a new cache for a room, and subscribes to room updates, so as
466    /// to handle new timeline events.
467    fn new(
468        client: WeakClient,
469        state: RoomEventCacheStateLock,
470        pagination_status: SharedObservable<RoomPaginationStatus>,
471        room_id: OwnedRoomId,
472        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
473        update_sender: Sender<RoomEventCacheUpdate>,
474        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
475    ) -> Self {
476        let weak_room = WeakRoom::new(client, room_id);
477
478        Self {
479            room_id: weak_room.room_id().to_owned(),
480            weak_room,
481            state,
482            update_sender,
483            pagination_batch_token_notifier: Default::default(),
484            auto_shrink_sender,
485            pagination_status,
486            generic_update_sender,
487        }
488    }
489
490    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
491        if account_data.is_empty() {
492            return;
493        }
494
495        let mut handled_read_marker = false;
496
497        trace!("Handling account data");
498
499        for raw_event in account_data {
500            match raw_event.deserialize() {
501                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
502                    // If duplicated, do not forward read marker multiple times
503                    // to avoid clutter the update channel.
504                    if handled_read_marker {
505                        continue;
506                    }
507
508                    handled_read_marker = true;
509
510                    // Propagate to observers. (We ignore the error if there aren't any.)
511                    let _ = self.update_sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
512                        event_id: ev.content.event_id,
513                    });
514                }
515
516                Ok(_) => {
517                    // We're not interested in other room account data updates,
518                    // at this point.
519                }
520
521                Err(e) => {
522                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
523                    warn!(event_type, "Failed to deserialize account data: {e}");
524                }
525            }
526        }
527    }
528
529    #[instrument(skip_all, fields(room_id = %self.room_id))]
530    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
531        self.handle_timeline(
532            updates.timeline,
533            updates.ephemeral.clone(),
534            updates.ambiguity_changes,
535        )
536        .await?;
537        self.handle_account_data(updates.account_data);
538
539        Ok(())
540    }
541
542    #[instrument(skip_all, fields(room_id = %self.room_id))]
543    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
544        self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
545
546        Ok(())
547    }
548
549    /// Handle a [`Timeline`], i.e. new events received by a sync for this
550    /// room.
551    async fn handle_timeline(
552        &self,
553        timeline: Timeline,
554        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
555        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
556    ) -> Result<()> {
557        if timeline.events.is_empty()
558            && timeline.prev_batch.is_none()
559            && ephemeral_events.is_empty()
560            && ambiguity_changes.is_empty()
561        {
562            return Ok(());
563        }
564
565        // Add all the events to the backend.
566        trace!("adding new events");
567
568        let (stored_prev_batch_token, timeline_event_diffs) =
569            self.state.write().await?.handle_sync(timeline).await?;
570
571        // Now that all events have been added, we can trigger the
572        // `pagination_token_notifier`.
573        if stored_prev_batch_token {
574            self.pagination_batch_token_notifier.notify_one();
575        }
576
577        // The order matters here: first send the timeline event diffs, then only the
578        // related events (read receipts, etc.).
579        if !timeline_event_diffs.is_empty() {
580            let _ = self.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
581                diffs: timeline_event_diffs,
582                origin: EventsOrigin::Sync,
583            });
584
585            let _ = self
586                .generic_update_sender
587                .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
588        }
589
590        if !ephemeral_events.is_empty() {
591            let _ = self
592                .update_sender
593                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
594        }
595
596        if !ambiguity_changes.is_empty() {
597            let _ =
598                self.update_sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
599        }
600
601        Ok(())
602    }
603}
604
605/// Internal type to represent the output of
606/// [`RoomEventCacheState::load_more_events_backwards`].
607#[derive(Debug)]
608pub(super) enum LoadMoreEventsBackwardsOutcome {
609    /// A gap has been inserted.
610    Gap {
611        /// The previous batch token to be used as the "end" parameter in the
612        /// back-pagination request.
613        prev_token: Option<String>,
614    },
615
616    /// The start of the timeline has been reached.
617    StartOfTimeline,
618
619    /// Events have been inserted.
620    Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
621}
622
623// Use a private module to hide `events` to this parent module.
624mod private {
625    use std::{
626        collections::{BTreeMap, HashMap, HashSet},
627        sync::{
628            Arc,
629            atomic::{AtomicBool, AtomicUsize, Ordering},
630        },
631    };
632
633    use eyeball::SharedObservable;
634    use eyeball_im::VectorDiff;
635    use matrix_sdk_base::{
636        apply_redaction,
637        deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
638        event_cache::{
639            Event, Gap,
640            store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
641        },
642        linked_chunk::{
643            ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
644            OwnedLinkedChunkId, Position, Update, lazy_loader,
645        },
646        serde_helpers::{extract_edit_target, extract_thread_root},
647        sync::Timeline,
648    };
649    use matrix_sdk_common::executor::spawn;
650    use ruma::{
651        EventId, OwnedEventId, OwnedRoomId, RoomId,
652        events::{
653            AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
654            relation::RelationType, room::redaction::SyncRoomRedactionEvent,
655        },
656        room_version_rules::RoomVersionRules,
657        serde::Raw,
658    };
659    use tokio::sync::{
660        Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
661        broadcast::{Receiver, Sender},
662    };
663    use tracing::{debug, error, instrument, trace, warn};
664
665    use super::{
666        super::{
667            BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate,
668            RoomPaginationStatus, ThreadEventCacheUpdate,
669            deduplicator::{DeduplicationOutcome, filter_duplicate_events},
670            room::threads::ThreadEventCache,
671        },
672        EventLocation, EventsOrigin, LoadMoreEventsBackwardsOutcome, RoomEventCacheGenericUpdate,
673        RoomEventCacheUpdate,
674        events::EventLinkedChunk,
675        sort_positions_descending,
676    };
677
678    /// State for a single room's event cache.
679    ///
680    /// This contains all the inner mutable states that ought to be updated at
681    /// the same time.
682    pub struct RoomEventCacheStateLock {
683        /// The per-thread lock around the real state.
684        locked_state: RwLock<RoomEventCacheStateLockInner>,
685
686        /// Please see inline comment of [`Self::read`] to understand why it
687        /// exists.
688        read_lock_acquisition: Mutex<()>,
689    }
690
691    struct RoomEventCacheStateLockInner {
692        /// Whether thread support has been enabled for the event cache.
693        enabled_thread_support: bool,
694
695        /// The room this state relates to.
696        room_id: OwnedRoomId,
697
698        /// Reference to the underlying backing store.
699        store: EventCacheStoreLock,
700
701        /// The loaded events for the current room, that is, the in-memory
702        /// linked chunk for this room.
703        room_linked_chunk: EventLinkedChunk,
704
705        /// Threads present in this room.
706        ///
707        /// Keyed by the thread root event ID.
708        threads: HashMap<OwnedEventId, ThreadEventCache>,
709
710        pagination_status: SharedObservable<RoomPaginationStatus>,
711
712        /// A clone of [`super::RoomEventCacheInner::update_sender`].
713        ///
714        /// This is used only by the [`RoomEventCacheStateLock::read`] and
715        /// [`RoomEventCacheStateLock::write`] when the state must be reset.
716        update_sender: Sender<RoomEventCacheUpdate>,
717
718        /// A clone of [`super::super::EventCacheInner::generic_update_sender`].
719        ///
720        /// This is used only by the [`RoomEventCacheStateLock::read`] and
721        /// [`RoomEventCacheStateLock::write`] when the state must be reset.
722        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
723
724        /// A clone of
725        /// [`super::super::EventCacheInner::linked_chunk_update_sender`].
726        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
727
728        /// The rules for the version of this room.
729        room_version_rules: RoomVersionRules,
730
731        /// Have we ever waited for a previous-batch-token to come from sync, in
732        /// the context of pagination? We do this at most once per room,
733        /// the first time we try to run backward pagination. We reset
734        /// that upon clearing the timeline events.
735        waited_for_initial_prev_token: Arc<AtomicBool>,
736
737        /// An atomic count of the current number of subscriber of the
738        /// [`super::RoomEventCache`].
739        subscriber_count: Arc<AtomicUsize>,
740    }
741
742    impl RoomEventCacheStateLock {
743        /// Create a new state, or reload it from storage if it's been enabled.
744        ///
745        /// Not all events are going to be loaded. Only a portion of them. The
746        /// [`EventLinkedChunk`] relies on a [`LinkedChunk`] to store all
747        /// events. Only the last chunk will be loaded. It means the
748        /// events are loaded from the most recent to the oldest. To
749        /// load more events, see [`RoomPagination`].
750        ///
751        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
752        /// [`RoomPagination`]: super::RoomPagination
753        #[allow(clippy::too_many_arguments)]
754        pub async fn new(
755            room_id: OwnedRoomId,
756            room_version_rules: RoomVersionRules,
757            enabled_thread_support: bool,
758            update_sender: Sender<RoomEventCacheUpdate>,
759            generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
760            linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
761            store: EventCacheStoreLock,
762            pagination_status: SharedObservable<RoomPaginationStatus>,
763        ) -> Result<Self, EventCacheError> {
764            let store_guard = match store.lock().await? {
765                // Lock is clean: all good!
766                EventCacheStoreLockState::Clean(guard) => guard,
767
768                // Lock is dirty, not a problem, it's the first time we are creating this state, no
769                // need to refresh.
770                EventCacheStoreLockState::Dirty(guard) => {
771                    EventCacheStoreLockGuard::clear_dirty(&guard);
772
773                    guard
774                }
775            };
776
777            let linked_chunk_id = LinkedChunkId::Room(&room_id);
778
779            // Load the full linked chunk's metadata, so as to feed the order tracker.
780            //
781            // If loading the full linked chunk failed, we'll clear the event cache, as it
782            // indicates that at some point, there's some malformed data.
783            let full_linked_chunk_metadata =
784                match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
785                    Ok(metas) => metas,
786                    Err(err) => {
787                        error!(
788                            "error when loading a linked chunk's metadata from the store: {err}"
789                        );
790
791                        // Try to clear storage for this room.
792                        store_guard
793                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
794                            .await?;
795
796                        // Restart with an empty linked chunk.
797                        None
798                    }
799                };
800
801            let linked_chunk = match store_guard
802                .load_last_chunk(linked_chunk_id)
803                .await
804                .map_err(EventCacheError::from)
805                .and_then(|(last_chunk, chunk_identifier_generator)| {
806                    lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
807                        .map_err(EventCacheError::from)
808                }) {
809                Ok(linked_chunk) => linked_chunk,
810                Err(err) => {
811                    error!(
812                        "error when loading a linked chunk's latest chunk from the store: {err}"
813                    );
814
815                    // Try to clear storage for this room.
816                    store_guard
817                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
818                        .await?;
819
820                    None
821                }
822            };
823
824            let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false));
825
826            Ok(Self {
827                locked_state: RwLock::new(RoomEventCacheStateLockInner {
828                    enabled_thread_support,
829                    room_id,
830                    store,
831                    room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
832                        linked_chunk,
833                        full_linked_chunk_metadata,
834                    ),
835                    // The threads mapping is intentionally empty at start, since we're going to
836                    // reload threads lazily, as soon as we need to (based on external
837                    // subscribers) or when we get new information about those (from
838                    // sync).
839                    threads: HashMap::new(),
840                    pagination_status,
841                    update_sender,
842                    generic_update_sender,
843                    linked_chunk_update_sender,
844                    room_version_rules,
845                    waited_for_initial_prev_token,
846                    subscriber_count: Default::default(),
847                }),
848                read_lock_acquisition: Mutex::new(()),
849            })
850        }
851
852        /// Lock this [`RoomEventCacheStateLock`] with per-thread shared access.
853        ///
854        /// This method locks the per-thread lock over the state, and then locks
855        /// the cross-process lock over the store. It returns an RAII guard
856        /// which will drop the read access to the state and to the store when
857        /// dropped.
858        ///
859        /// If the cross-process lock over the store is dirty (see
860        /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
861        pub async fn read(&self) -> Result<RoomEventCacheStateLockReadGuard<'_>, EventCacheError> {
862            // Only one call at a time to `read` is allowed.
863            //
864            // Why? Because in case the cross-process lock over the store is dirty, we need
865            // to upgrade the read lock over the state to a write lock.
866            //
867            // ## Upgradable read lock
868            //
869            // One may argue that this upgrades can be done with an _upgradable read lock_
870            // [^1] [^2]. We don't want to use this solution: an upgradable read lock is
871            // basically a mutex because we are losing the shared access property, i.e.
872            // having multiple read locks at the same time. This is an important property to
873            // hold for performance concerns.
874            //
875            // ## Downgradable write lock
876            //
877            // One may also argue we could first obtain a write lock over the state from the
878            // beginning, thus removing the need to upgrade the read lock to a write lock.
879            // The write lock is then downgraded to a read lock once the dirty is cleaned
880            // up. It can potentially create a deadlock in the following situation:
881            //
882            // - `read` is called once, it takes a write lock, then downgrades it to a read
883            //   lock: the guard is kept alive somewhere,
884            // - `read` is called again, and waits to obtain the write lock, which is
885            //   impossible as long as the guard from the previous call is not dropped.
886            //
887            // ## “Atomic” read and write
888            //
889            // One may finally argue to first obtain a read lock over the state, then drop
890            // it if the cross-process lock over the store is dirty, and immediately obtain
891            // a write lock (which can later be downgraded to a read lock). The problem is
892            // that this write lock is async: anything can happen between the drop and the
893            // new lock acquisition, and it's not possible to pause the runtime in the
894            // meantime.
895            //
896            // ## Semaphore with 1 permit, aka a Mutex
897            //
898            // The chosen idea is to allow only one execution at a time of this method: it
899            // becomes a critical section. That way we are free to “upgrade” the read lock
900            // by dropping it and obtaining a new write lock. All callers to this method are
901            // waiting, so nothing can happen in the meantime.
902            //
903            // Note that it doesn't conflict with the `write` method because this later
904            // immediately obtains a write lock, which avoids any conflict with this method.
905            //
906            // [^1]: https://docs.rs/lock_api/0.4.14/lock_api/struct.RwLock.html#method.upgradable_read
907            // [^2]: https://docs.rs/async-lock/3.4.1/async_lock/struct.RwLock.html#method.upgradable_read
908            let _one_reader_guard = self.read_lock_acquisition.lock().await;
909
910            // Obtain a read lock.
911            let state_guard = self.locked_state.read().await;
912
913            match state_guard.store.lock().await? {
914                EventCacheStoreLockState::Clean(store_guard) => {
915                    Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard })
916                }
917                EventCacheStoreLockState::Dirty(store_guard) => {
918                    // Drop the read lock, and take a write lock to modify the state.
919                    // This is safe because only one reader at a time (see
920                    // `Self::read_lock_acquisition`) is allowed.
921                    drop(state_guard);
922                    let state_guard = self.locked_state.write().await;
923
924                    let mut guard = RoomEventCacheStateLockWriteGuard {
925                        state: state_guard,
926                        store: store_guard,
927                    };
928
929                    // Force to reload by shrinking to the last chunk.
930                    let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
931
932                    // All good now, mark the cross-process lock as non-dirty.
933                    EventCacheStoreLockGuard::clear_dirty(&guard.store);
934
935                    // Downgrade the guard as soon as possible.
936                    let guard = guard.downgrade();
937
938                    // Now let the world know about the reload.
939                    if !updates_as_vector_diffs.is_empty() {
940                        // Notify observers about the update.
941                        let _ = guard.state.update_sender.send(
942                            RoomEventCacheUpdate::UpdateTimelineEvents {
943                                diffs: updates_as_vector_diffs,
944                                origin: EventsOrigin::Cache,
945                            },
946                        );
947
948                        // Notify observers about the generic update.
949                        let _ =
950                            guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
951                                room_id: guard.state.room_id.clone(),
952                            });
953                    }
954
955                    Ok(guard)
956                }
957            }
958        }
959
960        /// Lock this [`RoomEventCacheStateLock`] with exclusive per-thread
961        /// write access.
962        ///
963        /// This method locks the per-thread lock over the state, and then locks
964        /// the cross-process lock over the store. It returns an RAII guard
965        /// which will drop the write access to the state and to the store when
966        /// dropped.
967        ///
968        /// If the cross-process lock over the store is dirty (see
969        /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
970        pub async fn write(
971            &self,
972        ) -> Result<RoomEventCacheStateLockWriteGuard<'_>, EventCacheError> {
973            let state_guard = self.locked_state.write().await;
974
975            match state_guard.store.lock().await? {
976                EventCacheStoreLockState::Clean(store_guard) => {
977                    Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard })
978                }
979                EventCacheStoreLockState::Dirty(store_guard) => {
980                    let mut guard = RoomEventCacheStateLockWriteGuard {
981                        state: state_guard,
982                        store: store_guard,
983                    };
984
985                    // Force to reload by shrinking to the last chunk.
986                    let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
987
988                    // All good now, mark the cross-process lock as non-dirty.
989                    EventCacheStoreLockGuard::clear_dirty(&guard.store);
990
991                    // Now let the world know about the reload.
992                    if !updates_as_vector_diffs.is_empty() {
993                        // Notify observers about the update.
994                        let _ = guard.state.update_sender.send(
995                            RoomEventCacheUpdate::UpdateTimelineEvents {
996                                diffs: updates_as_vector_diffs,
997                                origin: EventsOrigin::Cache,
998                            },
999                        );
1000
1001                        // Notify observers about the generic update.
1002                        let _ =
1003                            guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1004                                room_id: guard.state.room_id.clone(),
1005                            });
1006                    }
1007
1008                    Ok(guard)
1009                }
1010            }
1011        }
1012    }
1013
1014    /// The read lock guard returned by [`RoomEventCacheStateLock::read`].
1015    pub struct RoomEventCacheStateLockReadGuard<'a> {
1016        /// The per-thread read lock guard over the
1017        /// [`RoomEventCacheStateLockInner`].
1018        state: RwLockReadGuard<'a, RoomEventCacheStateLockInner>,
1019
1020        /// The cross-process lock guard over the store.
1021        store: EventCacheStoreLockGuard,
1022    }
1023
1024    /// The write lock guard return by [`RoomEventCacheStateLock::write`].
1025    pub struct RoomEventCacheStateLockWriteGuard<'a> {
1026        /// The per-thread write lock guard over the
1027        /// [`RoomEventCacheStateLockInner`].
1028        state: RwLockWriteGuard<'a, RoomEventCacheStateLockInner>,
1029
1030        /// The cross-process lock guard over the store.
1031        store: EventCacheStoreLockGuard,
1032    }
1033
1034    impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1035        /// Synchronously downgrades a write lock into a read lock.
1036        ///
1037        /// The per-thread/state lock is downgraded atomically, without allowing
1038        /// any writers to take exclusive access of the lock in the meantime.
1039        ///
1040        /// It returns an RAII guard which will drop the write access to the
1041        /// state and to the store when dropped.
1042        fn downgrade(self) -> RoomEventCacheStateLockReadGuard<'a> {
1043            RoomEventCacheStateLockReadGuard { state: self.state.downgrade(), store: self.store }
1044        }
1045    }
1046
1047    impl<'a> RoomEventCacheStateLockReadGuard<'a> {
1048        /// Returns a read-only reference to the underlying room linked chunk.
1049        pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1050            &self.state.room_linked_chunk
1051        }
1052
1053        pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
1054            &self.state.subscriber_count
1055        }
1056
1057        /// Find a single event in this room.
1058        ///
1059        /// It starts by looking into loaded events in `EventLinkedChunk` before
1060        /// looking inside the storage.
1061        pub async fn find_event(
1062            &self,
1063            event_id: &EventId,
1064        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1065            find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1066                .await
1067        }
1068
1069        /// Find an event and all its relations in the persisted storage.
1070        ///
1071        /// This goes straight to the database, as a simplification; we don't
1072        /// expect to need to have to look up in memory events, or that
1073        /// all the related events are actually loaded.
1074        ///
1075        /// The related events are sorted like this:
1076        /// - events saved out-of-band with
1077        ///   [`super::RoomEventCache::save_events`] will be located at the
1078        ///   beginning of the array.
1079        /// - events present in the linked chunk (be it in memory or in the
1080        ///   database) will be sorted according to their ordering in the linked
1081        ///   chunk.
1082        pub async fn find_event_with_relations(
1083            &self,
1084            event_id: &EventId,
1085            filters: Option<Vec<RelationType>>,
1086        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1087            find_event_with_relations(
1088                event_id,
1089                &self.state.room_id,
1090                filters,
1091                &self.state.room_linked_chunk,
1092                &self.store,
1093            )
1094            .await
1095        }
1096
1097        //// Find a single event in this room, starting from the most recent event.
1098        ///
1099        /// **Warning**! It looks into the loaded events from the in-memory
1100        /// linked chunk **only**. It doesn't look inside the storage,
1101        /// contrary to [`Self::find_event`].
1102        pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1103        where
1104            P: FnMut(&Event) -> Option<O>,
1105        {
1106            self.state.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event))
1107        }
1108
1109        #[cfg(test)]
1110        pub fn is_dirty(&self) -> bool {
1111            EventCacheStoreLockGuard::is_dirty(&self.store)
1112        }
1113    }
1114
1115    impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1116        /// Returns a write reference to the underlying room linked chunk.
1117        #[cfg(any(feature = "e2e-encryption", test))]
1118        pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk {
1119            &mut self.state.room_linked_chunk
1120        }
1121
1122        /// Get a reference to the `waited_for_initial_prev_token` atomic bool.
1123        pub fn waited_for_initial_prev_token(&self) -> &Arc<AtomicBool> {
1124            &self.state.waited_for_initial_prev_token
1125        }
1126
1127        /// Find a single event in this room.
1128        ///
1129        /// It starts by looking into loaded events in `EventLinkedChunk` before
1130        /// looking inside the storage.
1131        pub async fn find_event(
1132            &self,
1133            event_id: &EventId,
1134        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1135            find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1136                .await
1137        }
1138
1139        /// Find an event and all its relations in the persisted storage.
1140        ///
1141        /// This goes straight to the database, as a simplification; we don't
1142        /// expect to need to have to look up in memory events, or that
1143        /// all the related events are actually loaded.
1144        ///
1145        /// The related events are sorted like this:
1146        /// - events saved out-of-band with
1147        ///   [`super::RoomEventCache::save_events`] will be located at the
1148        ///   beginning of the array.
1149        /// - events present in the linked chunk (be it in memory or in the
1150        ///   database) will be sorted according to their ordering in the linked
1151        ///   chunk.
1152        pub async fn find_event_with_relations(
1153            &self,
1154            event_id: &EventId,
1155            filters: Option<Vec<RelationType>>,
1156        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1157            find_event_with_relations(
1158                event_id,
1159                &self.state.room_id,
1160                filters,
1161                &self.state.room_linked_chunk,
1162                &self.store,
1163            )
1164            .await
1165        }
1166
1167        /// Load more events backwards if the last chunk is **not** a gap.
1168        pub async fn load_more_events_backwards(
1169            &mut self,
1170        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
1171            // If any in-memory chunk is a gap, don't load more events, and let the caller
1172            // resolve the gap.
1173            if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token)
1174            {
1175                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
1176            }
1177
1178            let prev_first_chunk = self
1179                .state
1180                .room_linked_chunk
1181                .chunks()
1182                .next()
1183                .expect("a linked chunk is never empty");
1184
1185            // The first chunk is not a gap, we can load its previous chunk.
1186            let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1187            let new_first_chunk = match self
1188                .store
1189                .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
1190                .await
1191            {
1192                Ok(Some(new_first_chunk)) => {
1193                    // All good, let's continue with this chunk.
1194                    new_first_chunk
1195                }
1196
1197                Ok(None) => {
1198                    // If we never received events for this room, this means we've never received a
1199                    // sync for that room, because every room must have *at least* a room creation
1200                    // event. Otherwise, we have reached the start of the timeline.
1201
1202                    if self.state.room_linked_chunk.events().next().is_some() {
1203                        // If there's at least one event, this means we've reached the start of the
1204                        // timeline, since the chunk is fully loaded.
1205                        trace!("chunk is fully loaded and non-empty: reached_start=true");
1206                        return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
1207                    }
1208
1209                    // Otherwise, start back-pagination from the end of the room.
1210                    return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
1211                }
1212
1213                Err(err) => {
1214                    error!("error when loading the previous chunk of a linked chunk: {err}");
1215
1216                    // Clear storage for this room.
1217                    self.store
1218                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1219                        .await?;
1220
1221                    // Return the error.
1222                    return Err(err.into());
1223                }
1224            };
1225
1226            let chunk_content = new_first_chunk.content.clone();
1227
1228            // We've reached the start on disk, if and only if, there was no chunk prior to
1229            // the one we just loaded.
1230            //
1231            // This value is correct, if and only if, it is used for a chunk content of kind
1232            // `Items`.
1233            let reached_start = new_first_chunk.previous.is_none();
1234
1235            if let Err(err) =
1236                self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk)
1237            {
1238                error!("error when inserting the previous chunk into its linked chunk: {err}");
1239
1240                // Clear storage for this room.
1241                self.store
1242                    .handle_linked_chunk_updates(
1243                        LinkedChunkId::Room(&self.state.room_id),
1244                        vec![Update::Clear],
1245                    )
1246                    .await?;
1247
1248                // Return the error.
1249                return Err(err.into());
1250            }
1251
1252            // ⚠️ Let's not propagate the updates to the store! We already have these data
1253            // in the store! Let's drain them.
1254            let _ = self.state.room_linked_chunk.store_updates().take();
1255
1256            // However, we want to get updates as `VectorDiff`s.
1257            let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1258
1259            Ok(match chunk_content {
1260                ChunkContent::Gap(gap) => {
1261                    trace!("reloaded chunk from disk (gap)");
1262                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1263                }
1264
1265                ChunkContent::Items(events) => {
1266                    trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1267                    LoadMoreEventsBackwardsOutcome::Events {
1268                        events,
1269                        timeline_event_diffs,
1270                        reached_start,
1271                    }
1272                }
1273            })
1274        }
1275
1276        /// If storage is enabled, unload all the chunks, then reloads only the
1277        /// last one.
1278        ///
1279        /// If storage's enabled, return a diff update that starts with a clear
1280        /// of all events; as a result, the caller may override any
1281        /// pending diff updates with the result of this function.
1282        ///
1283        /// Otherwise, returns `None`.
1284        pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1285            // Attempt to load the last chunk.
1286            let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1287            let (last_chunk, chunk_identifier_generator) =
1288                match self.store.load_last_chunk(linked_chunk_id).await {
1289                    Ok(pair) => pair,
1290
1291                    Err(err) => {
1292                        // If loading the last chunk failed, clear the entire linked chunk.
1293                        error!("error when reloading a linked chunk from memory: {err}");
1294
1295                        // Clear storage for this room.
1296                        self.store
1297                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1298                            .await?;
1299
1300                        // Restart with an empty linked chunk.
1301                        (None, ChunkIdentifierGenerator::new_from_scratch())
1302                    }
1303                };
1304
1305            debug!("unloading the linked chunk, and resetting it to its last chunk");
1306
1307            // Remove all the chunks from the linked chunks, except for the last one, and
1308            // updates the chunk identifier generator.
1309            if let Err(err) =
1310                self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1311            {
1312                error!("error when replacing the linked chunk: {err}");
1313                return self.reset_internal().await;
1314            }
1315
1316            // Let pagination observers know that we may have not reached the start of the
1317            // timeline.
1318            // TODO: likely need to cancel any ongoing pagination.
1319            self.state
1320                .pagination_status
1321                .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1322
1323            // Don't propagate those updates to the store; this is only for the in-memory
1324            // representation that we're doing this. Let's drain those store updates.
1325            let _ = self.state.room_linked_chunk.store_updates().take();
1326
1327            Ok(())
1328        }
1329
1330        /// Automatically shrink the room if there are no more subscribers, as
1331        /// indicated by the atomic number of active subscribers.
1332        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1333        pub async fn auto_shrink_if_no_subscribers(
1334            &mut self,
1335        ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1336            let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
1337
1338            trace!(subscriber_count, "received request to auto-shrink");
1339
1340            if subscriber_count == 0 {
1341                // If we are the last strong reference to the auto-shrinker, we can shrink the
1342                // events data structure to its last chunk.
1343                self.shrink_to_last_chunk().await?;
1344
1345                Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
1346            } else {
1347                Ok(None)
1348            }
1349        }
1350
1351        /// Force to shrink the room, whenever there is subscribers or not.
1352        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1353        pub async fn force_shrink_to_last_chunk(
1354            &mut self,
1355        ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1356            self.shrink_to_last_chunk().await?;
1357
1358            Ok(self.state.room_linked_chunk.updates_as_vector_diffs())
1359        }
1360
1361        /// Remove events by their position, in `EventLinkedChunk` and in
1362        /// `EventCacheStore`.
1363        ///
1364        /// This method is purposely isolated because it must ensure that
1365        /// positions are sorted appropriately or it can be disastrous.
1366        #[instrument(skip_all)]
1367        pub async fn remove_events(
1368            &mut self,
1369            in_memory_events: Vec<(OwnedEventId, Position)>,
1370            in_store_events: Vec<(OwnedEventId, Position)>,
1371        ) -> Result<(), EventCacheError> {
1372            // In-store events.
1373            if !in_store_events.is_empty() {
1374                let mut positions = in_store_events
1375                    .into_iter()
1376                    .map(|(_event_id, position)| position)
1377                    .collect::<Vec<_>>();
1378
1379                sort_positions_descending(&mut positions);
1380
1381                let updates = positions
1382                    .into_iter()
1383                    .map(|pos| Update::RemoveItem { at: pos })
1384                    .collect::<Vec<_>>();
1385
1386                self.apply_store_only_updates(updates).await?;
1387            }
1388
1389            // In-memory events.
1390            if in_memory_events.is_empty() {
1391                // Nothing else to do, return early.
1392                return Ok(());
1393            }
1394
1395            // `remove_events_by_position` is responsible of sorting positions.
1396            self.state
1397                .room_linked_chunk
1398                .remove_events_by_position(
1399                    in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1400                )
1401                .expect("failed to remove an event");
1402
1403            self.propagate_changes().await
1404        }
1405
1406        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1407            let updates = self.state.room_linked_chunk.store_updates().take();
1408            self.send_updates_to_store(updates).await
1409        }
1410
1411        /// Apply some updates that are effective only on the store itself.
1412        ///
1413        /// This method should be used only for updates that happen *outside*
1414        /// the in-memory linked chunk. Such updates must be applied
1415        /// onto the ordering tracker as well as to the persistent
1416        /// storage.
1417        async fn apply_store_only_updates(
1418            &mut self,
1419            updates: Vec<Update<Event, Gap>>,
1420        ) -> Result<(), EventCacheError> {
1421            self.state.room_linked_chunk.order_tracker.map_updates(&updates);
1422            self.send_updates_to_store(updates).await
1423        }
1424
1425        async fn send_updates_to_store(
1426            &mut self,
1427            mut updates: Vec<Update<Event, Gap>>,
1428        ) -> Result<(), EventCacheError> {
1429            if updates.is_empty() {
1430                return Ok(());
1431            }
1432
1433            // Strip relations from updates which insert or replace items.
1434            for update in updates.iter_mut() {
1435                match update {
1436                    Update::PushItems { items, .. } => strip_relations_from_events(items),
1437                    Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
1438                    // Other update kinds don't involve adding new events.
1439                    Update::NewItemsChunk { .. }
1440                    | Update::NewGapChunk { .. }
1441                    | Update::RemoveChunk(_)
1442                    | Update::RemoveItem { .. }
1443                    | Update::DetachLastItems { .. }
1444                    | Update::StartReattachItems
1445                    | Update::EndReattachItems
1446                    | Update::Clear => {}
1447                }
1448            }
1449
1450            // Spawn a task to make sure that all the changes are effectively forwarded to
1451            // the store, even if the call to this method gets aborted.
1452            //
1453            // The store cross-process locking involves an actual mutex, which ensures that
1454            // storing updates happens in the expected order.
1455
1456            let store = self.store.clone();
1457            let room_id = self.state.room_id.clone();
1458            let cloned_updates = updates.clone();
1459
1460            spawn(async move {
1461                trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1462                let linked_chunk_id = LinkedChunkId::Room(&room_id);
1463                store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1464                trace!("linked chunk updates applied");
1465
1466                super::Result::Ok(())
1467            })
1468            .await
1469            .expect("joining failed")?;
1470
1471            // Forward that the store got updated to observers.
1472            let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1473                linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()),
1474                updates,
1475            });
1476
1477            Ok(())
1478        }
1479
1480        /// Reset this data structure as if it were brand new.
1481        ///
1482        /// Return a single diff update that is a clear of all events; as a
1483        /// result, the caller may override any pending diff updates
1484        /// with the result of this function.
1485        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1486            self.reset_internal().await?;
1487
1488            let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
1489
1490            // Ensure the contract defined in the doc comment is true:
1491            debug_assert_eq!(diff_updates.len(), 1);
1492            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1493
1494            Ok(diff_updates)
1495        }
1496
1497        async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1498            self.state.room_linked_chunk.reset();
1499
1500            // No need to update the thread summaries: the room events are
1501            // gone because of the reset of `room_linked_chunk`.
1502            //
1503            // Clear the threads.
1504            for thread in self.state.threads.values_mut() {
1505                thread.clear();
1506            }
1507
1508            self.propagate_changes().await?;
1509
1510            // Reset the pagination state too: pretend we never waited for the initial
1511            // prev-batch token, and indicate that we're not at the start of the
1512            // timeline, since we don't know about that anymore.
1513            self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst);
1514            // TODO: likely must cancel any ongoing back-paginations too
1515            self.state
1516                .pagination_status
1517                .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1518
1519            Ok(())
1520        }
1521
1522        /// Handle the result of a sync.
1523        ///
1524        /// It may send room event cache updates to the given sender, if it
1525        /// generated any of those.
1526        ///
1527        /// Returns `true` for the first part of the tuple if a new gap
1528        /// (previous-batch token) has been inserted, `false` otherwise.
1529        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1530        pub async fn handle_sync(
1531            &mut self,
1532            mut timeline: Timeline,
1533        ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1534            let mut prev_batch = timeline.prev_batch.take();
1535
1536            let DeduplicationOutcome {
1537                all_events: events,
1538                in_memory_duplicated_event_ids,
1539                in_store_duplicated_event_ids,
1540                non_empty_all_duplicates: all_duplicates,
1541            } = filter_duplicate_events(
1542                &self.store,
1543                LinkedChunkId::Room(&self.state.room_id),
1544                &self.state.room_linked_chunk,
1545                timeline.events,
1546            )
1547            .await?;
1548
1549            // If the timeline isn't limited, and we already knew about some past events,
1550            // then this definitely knows what the timeline head is (either we know
1551            // about all the events persisted in storage, or we have a gap
1552            // somewhere). In this case, we can ditch the previous-batch
1553            // token, which is an optimization to avoid unnecessary future back-pagination
1554            // requests.
1555            //
1556            // We can also ditch it if we knew about all the events that came from sync,
1557            // namely, they were all deduplicated. In this case, using the
1558            // previous-batch token would only result in fetching other events we
1559            // knew about. This is slightly incorrect in the presence of
1560            // network splits, but this has shown to be Good Enough™.
1561            if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
1562                || all_duplicates
1563            {
1564                prev_batch = None;
1565            }
1566
1567            if prev_batch.is_some() {
1568                // Sad time: there's a gap, somewhere, in the timeline, and there's at least one
1569                // non-duplicated event. We don't know which threads might have gappy, so we
1570                // must invalidate them all :(
1571                // TODO(bnjbvr): figure out a better catchup mechanism for threads.
1572                let mut summaries_to_update = Vec::new();
1573
1574                for (thread_root, thread) in self.state.threads.iter_mut() {
1575                    // Empty the thread's linked chunk.
1576                    thread.clear();
1577
1578                    summaries_to_update.push(thread_root.clone());
1579                }
1580
1581                // Now, update the summaries to indicate that we're not sure what the latest
1582                // thread event is. The thread count can remain as is, as it might still be
1583                // valid, and there's no good value to reset it to, anyways.
1584                for thread_root in summaries_to_update {
1585                    let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1586                    else {
1587                        trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1588                        continue;
1589                    };
1590
1591                    if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1592                        prev_summary.latest_reply = None;
1593
1594                        target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1595
1596                        self.replace_event_at(location, target_event).await?;
1597                    }
1598                }
1599            }
1600
1601            if all_duplicates {
1602                // No new events and no gap (per the previous check), thus no need to change the
1603                // room state. We're done!
1604                return Ok((false, Vec::new()));
1605            }
1606
1607            let has_new_gap = prev_batch.is_some();
1608
1609            // If we've never waited for an initial previous-batch token, and we've now
1610            // inserted a gap, no need to wait for a previous-batch token later.
1611            if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap {
1612                self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst);
1613            }
1614
1615            // Remove the old duplicated events.
1616            //
1617            // We don't have to worry the removals can change the position of the existing
1618            // events, because we are pushing all _new_ `events` at the back.
1619            self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1620                .await?;
1621
1622            self.state
1623                .room_linked_chunk
1624                .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1625
1626            self.post_process_new_events(events, true).await?;
1627
1628            if timeline.limited && has_new_gap {
1629                // If there was a previous batch token for a limited timeline, unload the chunks
1630                // so it only contains the last one; otherwise, there might be a
1631                // valid gap in between, and observers may not render it (yet).
1632                //
1633                // We must do this *after* persisting these events to storage (in
1634                // `post_process_new_events`).
1635                self.shrink_to_last_chunk().await?;
1636            }
1637
1638            let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1639
1640            Ok((has_new_gap, timeline_event_diffs))
1641        }
1642
1643        /// Handle the result of a single back-pagination request.
1644        ///
1645        /// If the `prev_token` is set, then this function will check that the
1646        /// corresponding gap is present in the in-memory linked chunk.
1647        /// If it's not the case, `Ok(None)` will be returned, and the
1648        /// caller may decide to do something based on that (e.g. restart a
1649        /// pagination).
1650        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1651        pub async fn handle_backpagination(
1652            &mut self,
1653            events: Vec<Event>,
1654            mut new_token: Option<String>,
1655            prev_token: Option<String>,
1656        ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1657        {
1658            // Check that the previous token still exists; otherwise it's a sign that the
1659            // room's timeline has been cleared.
1660            let prev_gap_id = if let Some(token) = prev_token {
1661                // Find the corresponding gap in the in-memory linked chunk.
1662                let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| {
1663                    matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1664                });
1665
1666                if gap_chunk_id.is_none() {
1667                    // We got a previous-batch token from the linked chunk *before* running the
1668                    // request, but it is missing *after* completing the request.
1669                    //
1670                    // It may be a sign the linked chunk has been reset, but it's fine, per this
1671                    // function's contract.
1672                    return Ok(None);
1673                }
1674
1675                gap_chunk_id
1676            } else {
1677                None
1678            };
1679
1680            let DeduplicationOutcome {
1681                all_events: mut events,
1682                in_memory_duplicated_event_ids,
1683                in_store_duplicated_event_ids,
1684                non_empty_all_duplicates: all_duplicates,
1685            } = filter_duplicate_events(
1686                &self.store,
1687                LinkedChunkId::Room(&self.state.room_id),
1688                &self.state.room_linked_chunk,
1689                events,
1690            )
1691            .await?;
1692
1693            // If not all the events have been back-paginated, we need to remove the
1694            // previous ones, otherwise we can end up with misordered events.
1695            //
1696            // Consider the following scenario:
1697            // - sync returns [D, E, F]
1698            // - then sync returns [] with a previous batch token PB1, so the internal
1699            //   linked chunk state is [D, E, F, PB1].
1700            // - back-paginating with PB1 may return [A, B, C, D, E, F].
1701            //
1702            // Only inserting the new events when replacing PB1 would result in a timeline
1703            // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
1704            // all the events, in case this happens (see also #4746).
1705
1706            if !all_duplicates {
1707                // Let's forget all the previous events.
1708                self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1709                    .await?;
1710            } else {
1711                // All new events are duplicated, they can all be ignored.
1712                events.clear();
1713                // The gap can be ditched too, as it won't be useful to backpaginate any
1714                // further.
1715                new_token = None;
1716            }
1717
1718            // `/messages` has been called with `dir=b` (backwards), so the events are in
1719            // the inverted order; reorder them.
1720            let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1721
1722            let new_gap = new_token.map(|prev_token| Gap { prev_token });
1723            let reached_start = self.state.room_linked_chunk.finish_back_pagination(
1724                prev_gap_id,
1725                new_gap,
1726                &topo_ordered_events,
1727            );
1728
1729            // Note: this flushes updates to the store.
1730            self.post_process_new_events(topo_ordered_events, false).await?;
1731
1732            let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1733
1734            Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1735        }
1736
1737        /// Subscribe to thread for a given root event, and get a (maybe empty)
1738        /// initially known list of events for that thread.
1739        pub fn subscribe_to_thread(
1740            &mut self,
1741            root: OwnedEventId,
1742        ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1743            self.get_or_reload_thread(root).subscribe()
1744        }
1745
1746        /// Back paginate in the given thread.
1747        ///
1748        /// Will always start from the end, unless we previously paginated.
1749        pub fn finish_thread_network_pagination(
1750            &mut self,
1751            root: OwnedEventId,
1752            prev_token: Option<String>,
1753            new_token: Option<String>,
1754            events: Vec<Event>,
1755        ) -> Option<BackPaginationOutcome> {
1756            self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1757        }
1758
1759        pub fn load_more_thread_events_backwards(
1760            &mut self,
1761            root: OwnedEventId,
1762        ) -> LoadMoreEventsBackwardsOutcome {
1763            self.get_or_reload_thread(root).load_more_events_backwards()
1764        }
1765
1766        // --------------------------------------------
1767        // utility methods
1768        // --------------------------------------------
1769
1770        /// Post-process new events, after they have been added to the in-memory
1771        /// linked chunk.
1772        ///
1773        /// Flushes updates to disk first.
1774        pub(in super::super) async fn post_process_new_events(
1775            &mut self,
1776            events: Vec<Event>,
1777            is_sync: bool,
1778        ) -> Result<(), EventCacheError> {
1779            // Update the store before doing the post-processing.
1780            self.propagate_changes().await?;
1781
1782            let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1783
1784            for event in events {
1785                self.maybe_apply_new_redaction(&event).await?;
1786
1787                if self.state.enabled_thread_support {
1788                    // Only add the event to a thread if:
1789                    // - thread support is enabled,
1790                    // - and if this is a sync (we can't know where to insert backpaginated events
1791                    //   in threads).
1792                    if is_sync {
1793                        if let Some(thread_root) = extract_thread_root(event.raw()) {
1794                            new_events_by_thread
1795                                .entry(thread_root)
1796                                .or_default()
1797                                .push(event.clone());
1798                        } else if let Some(event_id) = event.event_id() {
1799                            // If we spot the root of a thread, add it to its linked chunk.
1800                            if self.state.threads.contains_key(&event_id) {
1801                                new_events_by_thread
1802                                    .entry(event_id)
1803                                    .or_default()
1804                                    .push(event.clone());
1805                            }
1806                        }
1807                    }
1808
1809                    // Look for edits that may apply to a thread; we'll process them later.
1810                    if let Some(edit_target) = extract_edit_target(event.raw()) {
1811                        // If the edited event is known, and part of a thread,
1812                        if let Some((_location, edit_target_event)) =
1813                            self.find_event(&edit_target).await?
1814                            && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1815                        {
1816                            // Mark the thread for processing, unless it was already marked as
1817                            // such.
1818                            new_events_by_thread.entry(thread_root).or_default();
1819                        }
1820                    }
1821                }
1822
1823                // Save a bundled thread event, if there was one.
1824                if let Some(bundled_thread) = event.bundled_latest_thread_event {
1825                    self.save_events([*bundled_thread]).await?;
1826                }
1827            }
1828
1829            if self.state.enabled_thread_support {
1830                self.update_threads(new_events_by_thread).await?;
1831            }
1832
1833            Ok(())
1834        }
1835
1836        fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1837            // TODO: when there's persistent storage, try to lazily reload from disk, if
1838            // missing from memory.
1839            let room_id = self.state.room_id.clone();
1840            let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1841
1842            self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1843                ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender)
1844            })
1845        }
1846
1847        #[instrument(skip_all)]
1848        async fn update_threads(
1849            &mut self,
1850            new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1851        ) -> Result<(), EventCacheError> {
1852            for (thread_root, new_events) in new_events_by_thread {
1853                let thread_cache = self.get_or_reload_thread(thread_root.clone());
1854
1855                thread_cache.add_live_events(new_events);
1856
1857                let mut latest_event_id = thread_cache.latest_event_id();
1858
1859                // If there's an edit to the latest event in the thread, use the latest edit
1860                // event id as the latest event id for the thread summary.
1861                if let Some(event_id) = latest_event_id.as_ref()
1862                    && let Some((_, edits)) = self
1863                        .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1864                        .await?
1865                    && let Some(latest_edit) = edits.last()
1866                {
1867                    latest_event_id = latest_edit.event_id();
1868                }
1869
1870                self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1871            }
1872
1873            Ok(())
1874        }
1875
1876        /// Update a thread summary on the given thread root, if needs be.
1877        async fn maybe_update_thread_summary(
1878            &mut self,
1879            thread_root: OwnedEventId,
1880            latest_event_id: Option<OwnedEventId>,
1881        ) -> Result<(), EventCacheError> {
1882            // Add a thread summary to the (room) event which has the thread root, if we
1883            // knew about it.
1884
1885            let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1886                trace!(%thread_root, "thread root event is missing from the room linked chunk");
1887                return Ok(());
1888            };
1889
1890            let prev_summary = target_event.thread_summary.summary();
1891
1892            // Recompute the thread summary, if needs be.
1893
1894            // Read the latest number of thread replies from the store.
1895            //
1896            // Implementation note: since this is based on the `m.relates_to` field, and
1897            // that field can only be present on room messages, we don't have to
1898            // worry about filtering out aggregation events (like
1899            // reactions/edits/etc.). Pretty neat, huh?
1900            let num_replies = {
1901                let thread_replies = self
1902                    .store
1903                    .find_event_relations(
1904                        &self.state.room_id,
1905                        &thread_root,
1906                        Some(&[RelationType::Thread]),
1907                    )
1908                    .await?;
1909                thread_replies.len().try_into().unwrap_or(u32::MAX)
1910            };
1911
1912            let new_summary = if num_replies > 0 {
1913                Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1914            } else {
1915                None
1916            };
1917
1918            if prev_summary == new_summary.as_ref() {
1919                trace!(%thread_root, "thread summary is already up-to-date");
1920                return Ok(());
1921            }
1922
1923            // Trigger an update to observers.
1924            trace!(%thread_root, "updating thread summary: {new_summary:?}");
1925            target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
1926            self.replace_event_at(location, target_event).await
1927        }
1928
1929        /// Replaces a single event, be it saved in memory or in the store.
1930        ///
1931        /// If it was saved in memory, this will emit a notification to
1932        /// observers that a single item has been replaced. Otherwise,
1933        /// such a notification is not emitted, because observers are
1934        /// unlikely to observe the store updates directly.
1935        pub(crate) async fn replace_event_at(
1936            &mut self,
1937            location: EventLocation,
1938            event: Event,
1939        ) -> Result<(), EventCacheError> {
1940            match location {
1941                EventLocation::Memory(position) => {
1942                    self.state
1943                        .room_linked_chunk
1944                        .replace_event_at(position, event)
1945                        .expect("should have been a valid position of an item");
1946                    // We just changed the in-memory representation; synchronize this with
1947                    // the store.
1948                    self.propagate_changes().await?;
1949                }
1950                EventLocation::Store => {
1951                    self.save_events([event]).await?;
1952                }
1953            }
1954
1955            Ok(())
1956        }
1957
1958        /// If the given event is a redaction, try to retrieve the
1959        /// to-be-redacted event in the chunk, and replace it by the
1960        /// redacted form.
1961        #[instrument(skip_all)]
1962        async fn maybe_apply_new_redaction(
1963            &mut self,
1964            event: &Event,
1965        ) -> Result<(), EventCacheError> {
1966            let raw_event = event.raw();
1967
1968            // Do not deserialise the entire event if we aren't certain it's a
1969            // `m.room.redaction`. It saves a non-negligible amount of computations.
1970            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1971                raw_event.get_field::<MessageLikeEventType>("type")
1972            else {
1973                return Ok(());
1974            };
1975
1976            // It is a `m.room.redaction`! We can deserialize it entirely.
1977
1978            let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1979                redaction,
1980            ))) = raw_event.deserialize()
1981            else {
1982                return Ok(());
1983            };
1984
1985            let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
1986                warn!("missing target event id from the redaction event");
1987                return Ok(());
1988            };
1989
1990            // Replace the redacted event by a redacted form, if we knew about it.
1991            let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1992                trace!("redacted event is missing from the linked chunk");
1993                return Ok(());
1994            };
1995
1996            // Don't redact already redacted events.
1997            let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
1998                // TODO: replace with `deserialized.is_redacted()` when
1999                // https://github.com/ruma/ruma/pull/2254 has been merged.
2000                match deserialized {
2001                    AnySyncTimelineEvent::MessageLike(ev) => {
2002                        if ev.is_redacted() {
2003                            return Ok(());
2004                        }
2005                    }
2006                    AnySyncTimelineEvent::State(ev) => {
2007                        if ev.is_redacted() {
2008                            return Ok(());
2009                        }
2010                    }
2011                }
2012
2013                // If the event is part of a thread, update the thread linked chunk and the
2014                // summary.
2015                extract_thread_root(target_event.raw())
2016            } else {
2017                warn!("failed to deserialize the event to redact");
2018                None
2019            };
2020
2021            if let Some(redacted_event) = apply_redaction(
2022                target_event.raw(),
2023                event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
2024                &self.state.room_version_rules.redaction,
2025            ) {
2026                // It's safe to cast `redacted_event` here:
2027                // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
2028                //   when calling .raw(), so it's still one under the hood.
2029                // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
2030                target_event.replace_raw(redacted_event.cast_unchecked());
2031
2032                self.replace_event_at(location, target_event).await?;
2033
2034                // If the redacted event was part of a thread, remove it in the thread linked
2035                // chunk too, and make sure to update the thread root's summary
2036                // as well.
2037                //
2038                // Note: there is an ordering issue here: the above `replace_event_at` must
2039                // happen BEFORE we recompute the summary, otherwise the set of
2040                // replies may include the to-be-redacted event.
2041                if let Some(thread_root) = thread_root
2042                    && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
2043                {
2044                    thread_cache.remove_if_present(event_id);
2045
2046                    // The number of replies may have changed, so update the thread summary if
2047                    // needs be.
2048                    let latest_event_id = thread_cache.latest_event_id();
2049                    self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2050                }
2051            }
2052
2053            Ok(())
2054        }
2055
2056        /// Save events into the database, without notifying observers.
2057        pub async fn save_events(
2058            &mut self,
2059            events: impl IntoIterator<Item = Event>,
2060        ) -> Result<(), EventCacheError> {
2061            let store = self.store.clone();
2062            let room_id = self.state.room_id.clone();
2063            let events = events.into_iter().collect::<Vec<_>>();
2064
2065            // Spawn a task so the save is uninterrupted by task cancellation.
2066            spawn(async move {
2067                for event in events {
2068                    store.save_event(&room_id, event).await?;
2069                }
2070                super::Result::Ok(())
2071            })
2072            .await
2073            .expect("joining failed")?;
2074
2075            Ok(())
2076        }
2077
2078        #[cfg(test)]
2079        pub fn is_dirty(&self) -> bool {
2080            EventCacheStoreLockGuard::is_dirty(&self.store)
2081        }
2082    }
2083
2084    /// Load a linked chunk's full metadata, making sure the chunks are
2085    /// according to their their links.
2086    ///
2087    /// Returns `None` if there's no such linked chunk in the store, or an
2088    /// error if the linked chunk is malformed.
2089    async fn load_linked_chunk_metadata(
2090        store_guard: &EventCacheStoreLockGuard,
2091        linked_chunk_id: LinkedChunkId<'_>,
2092    ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
2093        let mut all_chunks = store_guard
2094            .load_all_chunks_metadata(linked_chunk_id)
2095            .await
2096            .map_err(EventCacheError::from)?;
2097
2098        if all_chunks.is_empty() {
2099            // There are no chunks, so there's nothing to do.
2100            return Ok(None);
2101        }
2102
2103        // Transform the vector into a hashmap, for quick lookup of the predecessors.
2104        let chunk_map: HashMap<_, _> =
2105            all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
2106
2107        // Find a last chunk.
2108        let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
2109        let Some(last) = iter.next() else {
2110            return Err(EventCacheError::InvalidLinkedChunkMetadata {
2111                details: "no last chunk found".to_owned(),
2112            });
2113        };
2114
2115        // There must at most one last chunk.
2116        if let Some(other_last) = iter.next() {
2117            return Err(EventCacheError::InvalidLinkedChunkMetadata {
2118                details: format!(
2119                    "chunks {} and {} both claim to be last chunks",
2120                    last.identifier.index(),
2121                    other_last.identifier.index()
2122                ),
2123            });
2124        }
2125
2126        // Rewind the chain back to the first chunk, and do some checks at the same
2127        // time.
2128        let mut seen = HashSet::new();
2129        let mut current = last;
2130        loop {
2131            // If we've already seen this chunk, there's a cycle somewhere.
2132            if !seen.insert(current.identifier) {
2133                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2134                    details: format!(
2135                        "cycle detected in linked chunk at {}",
2136                        current.identifier.index()
2137                    ),
2138                });
2139            }
2140
2141            let Some(prev_id) = current.previous else {
2142                // If there's no previous chunk, we're done.
2143                if seen.len() != all_chunks.len() {
2144                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
2145                        details: format!(
2146                            "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
2147                            seen.len(),
2148                            all_chunks.len()
2149                        ),
2150                    });
2151                }
2152                break;
2153            };
2154
2155            // If the previous chunk is not in the map, then it's unknown
2156            // and missing.
2157            let Some(pred_meta) = chunk_map.get(&prev_id) else {
2158                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2159                    details: format!(
2160                        "missing predecessor {} chunk for {}",
2161                        prev_id.index(),
2162                        current.identifier.index()
2163                    ),
2164                });
2165            };
2166
2167            // If the previous chunk isn't connected to the next, then the link is invalid.
2168            if pred_meta.next != Some(current.identifier) {
2169                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2170                    details: format!(
2171                        "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
2172                        pred_meta.identifier.index(),
2173                        pred_meta.next.map(|chunk_id| chunk_id.index()),
2174                        current.identifier.index()
2175                    ),
2176                });
2177            }
2178
2179            current = *pred_meta;
2180        }
2181
2182        // At this point, `current` is the identifier of the first chunk.
2183        //
2184        // Reorder the resulting vector, by going through the chain of `next` links, and
2185        // swapping items into their final position.
2186        //
2187        // Invariant in this loop: all items in [0..i[ are in their final, correct
2188        // position.
2189        let mut current = current.identifier;
2190        for i in 0..all_chunks.len() {
2191            // Find the target metadata.
2192            let j = all_chunks
2193                .iter()
2194                .rev()
2195                .position(|meta| meta.identifier == current)
2196                .map(|j| all_chunks.len() - 1 - j)
2197                .expect("the target chunk must be present in the metadata");
2198            if i != j {
2199                all_chunks.swap(i, j);
2200            }
2201            if let Some(next) = all_chunks[i].next {
2202                current = next;
2203            }
2204        }
2205
2206        Ok(Some(all_chunks))
2207    }
2208
2209    /// Removes the bundled relations from an event, if they were present.
2210    ///
2211    /// Only replaces the present if it contained bundled relations.
2212    fn strip_relations_if_present<T>(event: &mut Raw<T>) {
2213        // We're going to get rid of the `unsigned`/`m.relations` field, if it's
2214        // present.
2215        // Use a closure that returns an option so we can quickly short-circuit.
2216        let mut closure = || -> Option<()> {
2217            let mut val: serde_json::Value = event.deserialize_as().ok()?;
2218            let unsigned = val.get_mut("unsigned")?;
2219            let unsigned_obj = unsigned.as_object_mut()?;
2220            if unsigned_obj.remove("m.relations").is_some() {
2221                *event = Raw::new(&val).ok()?.cast_unchecked();
2222            }
2223            None
2224        };
2225        let _ = closure();
2226    }
2227
2228    fn strip_relations_from_event(ev: &mut Event) {
2229        match &mut ev.kind {
2230            TimelineEventKind::Decrypted(decrypted) => {
2231                // Remove all information about encryption info for
2232                // the bundled events.
2233                decrypted.unsigned_encryption_info = None;
2234
2235                // Remove the `unsigned`/`m.relations` field, if needs be.
2236                strip_relations_if_present(&mut decrypted.event);
2237            }
2238
2239            TimelineEventKind::UnableToDecrypt { event, .. }
2240            | TimelineEventKind::PlainText { event } => {
2241                strip_relations_if_present(event);
2242            }
2243        }
2244    }
2245
2246    /// Strips the bundled relations from a collection of events.
2247    fn strip_relations_from_events(items: &mut [Event]) {
2248        for ev in items.iter_mut() {
2249            strip_relations_from_event(ev);
2250        }
2251    }
2252
2253    /// Implementation of [`RoomEventCacheStateLockReadGuard::find_event`] and
2254    /// [`RoomEventCacheStateLockWriteGuard::find_event`].
2255    async fn find_event(
2256        event_id: &EventId,
2257        room_id: &RoomId,
2258        room_linked_chunk: &EventLinkedChunk,
2259        store: &EventCacheStoreLockGuard,
2260    ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
2261        // There are supposedly fewer events loaded in memory than in the store. Let's
2262        // start by looking up in the `EventLinkedChunk`.
2263        for (position, event) in room_linked_chunk.revents() {
2264            if event.event_id().as_deref() == Some(event_id) {
2265                return Ok(Some((EventLocation::Memory(position), event.clone())));
2266            }
2267        }
2268
2269        Ok(store.find_event(room_id, event_id).await?.map(|event| (EventLocation::Store, event)))
2270    }
2271
2272    /// Implementation of
2273    /// [`RoomEventCacheStateLockReadGuard::find_event_with_relations`] and
2274    /// [`RoomEventCacheStateLockWriteGuard::find_event_with_relations`].
2275    async fn find_event_with_relations(
2276        event_id: &EventId,
2277        room_id: &RoomId,
2278        filters: Option<Vec<RelationType>>,
2279        room_linked_chunk: &EventLinkedChunk,
2280        store: &EventCacheStoreLockGuard,
2281    ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
2282        // First, hit storage to get the target event and its related events.
2283        let found = store.find_event(room_id, event_id).await?;
2284
2285        let Some(target) = found else {
2286            // We haven't found the event: return early.
2287            return Ok(None);
2288        };
2289
2290        // Then, initialize the stack with all the related events, to find the
2291        // transitive closure of all the related events.
2292        let mut related = store.find_event_relations(room_id, event_id, filters.as_deref()).await?;
2293        let mut stack =
2294            related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
2295
2296        // Also keep track of already seen events, in case there's a loop in the
2297        // relation graph.
2298        let mut already_seen = HashSet::new();
2299        already_seen.insert(event_id.to_owned());
2300
2301        let mut num_iters = 1;
2302
2303        // Find the related event for each previously-related event.
2304        while let Some(event_id) = stack.pop() {
2305            if !already_seen.insert(event_id.clone()) {
2306                // Skip events we've already seen.
2307                continue;
2308            }
2309
2310            let other_related =
2311                store.find_event_relations(room_id, &event_id, filters.as_deref()).await?;
2312
2313            stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
2314            related.extend(other_related);
2315
2316            num_iters += 1;
2317        }
2318
2319        trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
2320
2321        // Sort the results by their positions in the linked chunk, if available.
2322        //
2323        // If an event doesn't have a known position, it goes to the start of the array.
2324        related.sort_by(|(_, lhs), (_, rhs)| {
2325            use std::cmp::Ordering;
2326
2327            match (lhs, rhs) {
2328                (None, None) => Ordering::Equal,
2329                (None, Some(_)) => Ordering::Less,
2330                (Some(_), None) => Ordering::Greater,
2331                (Some(lhs), Some(rhs)) => {
2332                    let lhs = room_linked_chunk.event_order(*lhs);
2333                    let rhs = room_linked_chunk.event_order(*rhs);
2334
2335                    // The events should have a definite position, but in the case they don't,
2336                    // still consider that not having a position means you'll end at the start
2337                    // of the array.
2338                    match (lhs, rhs) {
2339                        (None, None) => Ordering::Equal,
2340                        (None, Some(_)) => Ordering::Less,
2341                        (Some(_), None) => Ordering::Greater,
2342                        (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
2343                    }
2344                }
2345            }
2346        });
2347
2348        // Keep only the events, not their positions.
2349        let related = related.into_iter().map(|(event, _pos)| event).collect();
2350
2351        Ok(Some((target, related)))
2352    }
2353}
2354
2355/// An enum representing where an event has been found.
2356pub(super) enum EventLocation {
2357    /// Event lives in memory (and likely in the store!).
2358    Memory(Position),
2359
2360    /// Event lives in the store only, it has not been loaded in memory yet.
2361    Store,
2362}
2363
2364pub(super) use private::RoomEventCacheStateLock;
2365
2366#[cfg(test)]
2367mod tests {
2368    use matrix_sdk_base::event_cache::Event;
2369    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2370    use ruma::{
2371        RoomId, event_id,
2372        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2373        room_id, user_id,
2374    };
2375
2376    use crate::test_utils::logged_in_client;
2377
2378    #[async_test]
2379    async fn test_find_event_by_id_with_edit_relation() {
2380        let original_id = event_id!("$original");
2381        let related_id = event_id!("$related");
2382        let room_id = room_id!("!galette:saucisse.bzh");
2383        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2384
2385        assert_relations(
2386            room_id,
2387            f.text_msg("Original event").event_id(original_id).into(),
2388            f.text_msg("* An edited event")
2389                .edit(
2390                    original_id,
2391                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2392                )
2393                .event_id(related_id)
2394                .into(),
2395            f,
2396        )
2397        .await;
2398    }
2399
2400    #[async_test]
2401    async fn test_find_event_by_id_with_thread_reply_relation() {
2402        let original_id = event_id!("$original");
2403        let related_id = event_id!("$related");
2404        let room_id = room_id!("!galette:saucisse.bzh");
2405        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2406
2407        assert_relations(
2408            room_id,
2409            f.text_msg("Original event").event_id(original_id).into(),
2410            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2411            f,
2412        )
2413        .await;
2414    }
2415
2416    #[async_test]
2417    async fn test_find_event_by_id_with_reaction_relation() {
2418        let original_id = event_id!("$original");
2419        let related_id = event_id!("$related");
2420        let room_id = room_id!("!galette:saucisse.bzh");
2421        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2422
2423        assert_relations(
2424            room_id,
2425            f.text_msg("Original event").event_id(original_id).into(),
2426            f.reaction(original_id, ":D").event_id(related_id).into(),
2427            f,
2428        )
2429        .await;
2430    }
2431
2432    #[async_test]
2433    async fn test_find_event_by_id_with_poll_response_relation() {
2434        let original_id = event_id!("$original");
2435        let related_id = event_id!("$related");
2436        let room_id = room_id!("!galette:saucisse.bzh");
2437        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2438
2439        assert_relations(
2440            room_id,
2441            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2442                .event_id(original_id)
2443                .into(),
2444            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2445            f,
2446        )
2447        .await;
2448    }
2449
2450    #[async_test]
2451    async fn test_find_event_by_id_with_poll_end_relation() {
2452        let original_id = event_id!("$original");
2453        let related_id = event_id!("$related");
2454        let room_id = room_id!("!galette:saucisse.bzh");
2455        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2456
2457        assert_relations(
2458            room_id,
2459            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2460                .event_id(original_id)
2461                .into(),
2462            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2463            f,
2464        )
2465        .await;
2466    }
2467
2468    #[async_test]
2469    async fn test_find_event_by_id_with_filtered_relationships() {
2470        let original_id = event_id!("$original");
2471        let related_id = event_id!("$related");
2472        let associated_related_id = event_id!("$recursive_related");
2473        let room_id = room_id!("!galette:saucisse.bzh");
2474        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2475
2476        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2477        let related_event = event_factory
2478            .text_msg("* Edited event")
2479            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2480            .event_id(related_id)
2481            .into();
2482        let associated_related_event =
2483            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2484
2485        let client = logged_in_client(None).await;
2486
2487        let event_cache = client.event_cache();
2488        event_cache.subscribe().unwrap();
2489
2490        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2491        let room = client.get_room(room_id).unwrap();
2492
2493        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2494
2495        // Save the original event.
2496        room_event_cache.save_events([original_event]).await;
2497
2498        // Save the related event.
2499        room_event_cache.save_events([related_event]).await;
2500
2501        // Save the associated related event, which redacts the related event.
2502        room_event_cache.save_events([associated_related_event]).await;
2503
2504        let filter = Some(vec![RelationType::Replacement]);
2505        let (event, related_events) = room_event_cache
2506            .find_event_with_relations(original_id, filter)
2507            .await
2508            .expect("Failed to find the event with relations")
2509            .expect("Event has no relation");
2510        // Fetched event is the right one.
2511        let cached_event_id = event.event_id().unwrap();
2512        assert_eq!(cached_event_id, original_id);
2513
2514        // There's only the edit event (an edit event can't have its own edit event).
2515        assert_eq!(related_events.len(), 1);
2516
2517        let related_event_id = related_events[0].event_id().unwrap();
2518        assert_eq!(related_event_id, related_id);
2519
2520        // Now we'll filter threads instead, there should be no related events
2521        let filter = Some(vec![RelationType::Thread]);
2522        let (event, related_events) = room_event_cache
2523            .find_event_with_relations(original_id, filter)
2524            .await
2525            .expect("Failed to find the event with relations")
2526            .expect("Event has no relation");
2527
2528        // Fetched event is the right one.
2529        let cached_event_id = event.event_id().unwrap();
2530        assert_eq!(cached_event_id, original_id);
2531        // No Thread related events found
2532        assert!(related_events.is_empty());
2533    }
2534
2535    #[async_test]
2536    async fn test_find_event_by_id_with_recursive_relation() {
2537        let original_id = event_id!("$original");
2538        let related_id = event_id!("$related");
2539        let associated_related_id = event_id!("$recursive_related");
2540        let room_id = room_id!("!galette:saucisse.bzh");
2541        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2542
2543        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2544        let related_event = event_factory
2545            .text_msg("* Edited event")
2546            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2547            .event_id(related_id)
2548            .into();
2549        let associated_related_event =
2550            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2551
2552        let client = logged_in_client(None).await;
2553
2554        let event_cache = client.event_cache();
2555        event_cache.subscribe().unwrap();
2556
2557        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2558        let room = client.get_room(room_id).unwrap();
2559
2560        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2561
2562        // Save the original event.
2563        room_event_cache.save_events([original_event]).await;
2564
2565        // Save the related event.
2566        room_event_cache.save_events([related_event]).await;
2567
2568        // Save the associated related event, which redacts the related event.
2569        room_event_cache.save_events([associated_related_event]).await;
2570
2571        let (event, related_events) = room_event_cache
2572            .find_event_with_relations(original_id, None)
2573            .await
2574            .expect("Failed to find the event with relations")
2575            .expect("Event has no relation");
2576        // Fetched event is the right one.
2577        let cached_event_id = event.event_id().unwrap();
2578        assert_eq!(cached_event_id, original_id);
2579
2580        // There are both the related id and the associatively related id
2581        assert_eq!(related_events.len(), 2);
2582
2583        let related_event_id = related_events[0].event_id().unwrap();
2584        assert_eq!(related_event_id, related_id);
2585        let related_event_id = related_events[1].event_id().unwrap();
2586        assert_eq!(related_event_id, associated_related_id);
2587    }
2588
2589    async fn assert_relations(
2590        room_id: &RoomId,
2591        original_event: Event,
2592        related_event: Event,
2593        event_factory: EventFactory,
2594    ) {
2595        let client = logged_in_client(None).await;
2596
2597        let event_cache = client.event_cache();
2598        event_cache.subscribe().unwrap();
2599
2600        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2601        let room = client.get_room(room_id).unwrap();
2602
2603        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2604
2605        // Save the original event.
2606        let original_event_id = original_event.event_id().unwrap();
2607        room_event_cache.save_events([original_event]).await;
2608
2609        // Save an unrelated event to check it's not in the related events list.
2610        let unrelated_id = event_id!("$2");
2611        room_event_cache
2612            .save_events([event_factory
2613                .text_msg("An unrelated event")
2614                .event_id(unrelated_id)
2615                .into()])
2616            .await;
2617
2618        // Save the related event.
2619        let related_id = related_event.event_id().unwrap();
2620        room_event_cache.save_events([related_event]).await;
2621
2622        let (event, related_events) = room_event_cache
2623            .find_event_with_relations(&original_event_id, None)
2624            .await
2625            .expect("Failed to find the event with relations")
2626            .expect("Event has no relation");
2627        // Fetched event is the right one.
2628        let cached_event_id = event.event_id().unwrap();
2629        assert_eq!(cached_event_id, original_event_id);
2630
2631        // There is only the actually related event in the related ones
2632        let related_event_id = related_events[0].event_id().unwrap();
2633        assert_eq!(related_event_id, related_id);
2634    }
2635}
2636
2637#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
2638mod timed_tests {
2639    use std::{ops::Not, sync::Arc};
2640
2641    use assert_matches::assert_matches;
2642    use assert_matches2::assert_let;
2643    use eyeball_im::VectorDiff;
2644    use futures_util::FutureExt;
2645    use matrix_sdk_base::{
2646        event_cache::{
2647            Gap,
2648            store::{EventCacheStore as _, MemoryStore},
2649        },
2650        linked_chunk::{
2651            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2652            lazy_loader::from_all_chunks,
2653        },
2654        store::StoreConfig,
2655        sync::{JoinedRoomUpdate, Timeline},
2656    };
2657    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2658    use ruma::{
2659        EventId, OwnedUserId, event_id,
2660        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2661        room_id, user_id,
2662    };
2663    use tokio::task::yield_now;
2664
2665    use super::RoomEventCacheGenericUpdate;
2666    use crate::{
2667        assert_let_timeout,
2668        event_cache::{RoomEventCache, RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2669        test_utils::client::MockClientBuilder,
2670    };
2671
2672    #[async_test]
2673    async fn test_write_to_storage() {
2674        let room_id = room_id!("!galette:saucisse.bzh");
2675        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2676
2677        let event_cache_store = Arc::new(MemoryStore::new());
2678
2679        let client = MockClientBuilder::new(None)
2680            .on_builder(|builder| {
2681                builder.store_config(
2682                    StoreConfig::new("hodlor".to_owned())
2683                        .event_cache_store(event_cache_store.clone()),
2684                )
2685            })
2686            .build()
2687            .await;
2688
2689        let event_cache = client.event_cache();
2690
2691        // Don't forget to subscribe and like.
2692        event_cache.subscribe().unwrap();
2693
2694        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2695        let room = client.get_room(room_id).unwrap();
2696
2697        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2698        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2699
2700        // Propagate an update for a message and a prev-batch token.
2701        let timeline = Timeline {
2702            limited: true,
2703            prev_batch: Some("raclette".to_owned()),
2704            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2705        };
2706
2707        room_event_cache
2708            .inner
2709            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2710            .await
2711            .unwrap();
2712
2713        // Just checking the generic update is correct.
2714        assert_matches!(
2715            generic_stream.recv().await,
2716            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2717                assert_eq!(expected_room_id, room_id);
2718            }
2719        );
2720
2721        // Check the storage.
2722        let linked_chunk = from_all_chunks::<3, _, _>(
2723            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2724        )
2725        .unwrap()
2726        .unwrap();
2727
2728        assert_eq!(linked_chunk.chunks().count(), 2);
2729
2730        let mut chunks = linked_chunk.chunks();
2731
2732        // We start with the gap.
2733        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2734            assert_eq!(gap.prev_token, "raclette");
2735        });
2736
2737        // Then we have the stored event.
2738        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2739            assert_eq!(events.len(), 1);
2740            let deserialized = events[0].raw().deserialize().unwrap();
2741            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2742            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2743        });
2744
2745        // That's all, folks!
2746        assert!(chunks.next().is_none());
2747    }
2748
2749    #[async_test]
2750    async fn test_write_to_storage_strips_bundled_relations() {
2751        let room_id = room_id!("!galette:saucisse.bzh");
2752        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2753
2754        let event_cache_store = Arc::new(MemoryStore::new());
2755
2756        let client = MockClientBuilder::new(None)
2757            .on_builder(|builder| {
2758                builder.store_config(
2759                    StoreConfig::new("hodlor".to_owned())
2760                        .event_cache_store(event_cache_store.clone()),
2761                )
2762            })
2763            .build()
2764            .await;
2765
2766        let event_cache = client.event_cache();
2767
2768        // Don't forget to subscribe and like.
2769        event_cache.subscribe().unwrap();
2770
2771        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2772        let room = client.get_room(room_id).unwrap();
2773
2774        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2775        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2776
2777        // Propagate an update for a message with bundled relations.
2778        let ev = f
2779            .text_msg("hey yo")
2780            .sender(*ALICE)
2781            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2782            .into_event();
2783
2784        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2785
2786        room_event_cache
2787            .inner
2788            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2789            .await
2790            .unwrap();
2791
2792        // Just checking the generic update is correct.
2793        assert_matches!(
2794            generic_stream.recv().await,
2795            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2796                assert_eq!(expected_room_id, room_id);
2797            }
2798        );
2799
2800        // The in-memory linked chunk keeps the bundled relation.
2801        {
2802            let events = room_event_cache.events().await.unwrap();
2803
2804            assert_eq!(events.len(), 1);
2805
2806            let ev = events[0].raw().deserialize().unwrap();
2807            assert_let!(
2808                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2809            );
2810
2811            let original = msg.as_original().unwrap();
2812            assert_eq!(original.content.body(), "hey yo");
2813            assert!(original.unsigned.relations.replace.is_some());
2814        }
2815
2816        // The one in storage does not.
2817        let linked_chunk = from_all_chunks::<3, _, _>(
2818            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2819        )
2820        .unwrap()
2821        .unwrap();
2822
2823        assert_eq!(linked_chunk.chunks().count(), 1);
2824
2825        let mut chunks = linked_chunk.chunks();
2826        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2827            assert_eq!(events.len(), 1);
2828
2829            let ev = events[0].raw().deserialize().unwrap();
2830            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2831
2832            let original = msg.as_original().unwrap();
2833            assert_eq!(original.content.body(), "hey yo");
2834            assert!(original.unsigned.relations.replace.is_none());
2835        });
2836
2837        // That's all, folks!
2838        assert!(chunks.next().is_none());
2839    }
2840
2841    #[async_test]
2842    async fn test_clear() {
2843        let room_id = room_id!("!galette:saucisse.bzh");
2844        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2845
2846        let event_cache_store = Arc::new(MemoryStore::new());
2847
2848        let event_id1 = event_id!("$1");
2849        let event_id2 = event_id!("$2");
2850
2851        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2852        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2853
2854        // Prefill the store with some data.
2855        event_cache_store
2856            .handle_linked_chunk_updates(
2857                LinkedChunkId::Room(room_id),
2858                vec![
2859                    // An empty items chunk.
2860                    Update::NewItemsChunk {
2861                        previous: None,
2862                        new: ChunkIdentifier::new(0),
2863                        next: None,
2864                    },
2865                    // A gap chunk.
2866                    Update::NewGapChunk {
2867                        previous: Some(ChunkIdentifier::new(0)),
2868                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2869                        new: ChunkIdentifier::new(42),
2870                        next: None,
2871                        gap: Gap { prev_token: "comté".to_owned() },
2872                    },
2873                    // Another items chunk, non-empty this time.
2874                    Update::NewItemsChunk {
2875                        previous: Some(ChunkIdentifier::new(42)),
2876                        new: ChunkIdentifier::new(1),
2877                        next: None,
2878                    },
2879                    Update::PushItems {
2880                        at: Position::new(ChunkIdentifier::new(1), 0),
2881                        items: vec![ev1.clone()],
2882                    },
2883                    // And another items chunk, non-empty again.
2884                    Update::NewItemsChunk {
2885                        previous: Some(ChunkIdentifier::new(1)),
2886                        new: ChunkIdentifier::new(2),
2887                        next: None,
2888                    },
2889                    Update::PushItems {
2890                        at: Position::new(ChunkIdentifier::new(2), 0),
2891                        items: vec![ev2.clone()],
2892                    },
2893                ],
2894            )
2895            .await
2896            .unwrap();
2897
2898        let client = MockClientBuilder::new(None)
2899            .on_builder(|builder| {
2900                builder.store_config(
2901                    StoreConfig::new("hodlor".to_owned())
2902                        .event_cache_store(event_cache_store.clone()),
2903                )
2904            })
2905            .build()
2906            .await;
2907
2908        let event_cache = client.event_cache();
2909
2910        // Don't forget to subscribe and like.
2911        event_cache.subscribe().unwrap();
2912
2913        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2914        let room = client.get_room(room_id).unwrap();
2915
2916        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2917
2918        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
2919        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2920
2921        // The rooms knows about all cached events.
2922        {
2923            assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
2924            assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
2925        }
2926
2927        // But only part of events are loaded from the store
2928        {
2929            // The room must contain only one event because only one chunk has been loaded.
2930            assert_eq!(items.len(), 1);
2931            assert_eq!(items[0].event_id().unwrap(), event_id2);
2932
2933            assert!(stream.is_empty());
2934        }
2935
2936        // Let's load more chunks to load all events.
2937        {
2938            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2939
2940            assert_let_timeout!(
2941                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2942            );
2943            assert_eq!(diffs.len(), 1);
2944            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2945                // Here you are `event_id1`!
2946                assert_eq!(event.event_id().unwrap(), event_id1);
2947            });
2948
2949            assert!(stream.is_empty());
2950        }
2951
2952        // After clearing,…
2953        room_event_cache.clear().await.unwrap();
2954
2955        //… we get an update that the content has been cleared.
2956        assert_let_timeout!(
2957            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2958        );
2959        assert_eq!(diffs.len(), 1);
2960        assert_let!(VectorDiff::Clear = &diffs[0]);
2961
2962        // … same with a generic update.
2963        assert_let_timeout!(
2964            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2965        );
2966        assert_eq!(received_room_id, room_id);
2967
2968        // Events individually are not forgotten by the event cache, after clearing a
2969        // room.
2970        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
2971
2972        // But their presence in a linked chunk is forgotten.
2973        let items = room_event_cache.events().await.unwrap();
2974        assert!(items.is_empty());
2975
2976        // The event cache store too.
2977        let linked_chunk = from_all_chunks::<3, _, _>(
2978            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2979        )
2980        .unwrap()
2981        .unwrap();
2982
2983        // Note: while the event cache store could return `None` here, clearing it will
2984        // reset it to its initial form, maintaining the invariant that it
2985        // contains a single items chunk that's empty.
2986        assert_eq!(linked_chunk.num_items(), 0);
2987    }
2988
2989    #[async_test]
2990    async fn test_load_from_storage() {
2991        let room_id = room_id!("!galette:saucisse.bzh");
2992        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2993
2994        let event_cache_store = Arc::new(MemoryStore::new());
2995
2996        let event_id1 = event_id!("$1");
2997        let event_id2 = event_id!("$2");
2998
2999        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3000        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3001
3002        // Prefill the store with some data.
3003        event_cache_store
3004            .handle_linked_chunk_updates(
3005                LinkedChunkId::Room(room_id),
3006                vec![
3007                    // An empty items chunk.
3008                    Update::NewItemsChunk {
3009                        previous: None,
3010                        new: ChunkIdentifier::new(0),
3011                        next: None,
3012                    },
3013                    // A gap chunk.
3014                    Update::NewGapChunk {
3015                        previous: Some(ChunkIdentifier::new(0)),
3016                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
3017                        new: ChunkIdentifier::new(42),
3018                        next: None,
3019                        gap: Gap { prev_token: "cheddar".to_owned() },
3020                    },
3021                    // Another items chunk, non-empty this time.
3022                    Update::NewItemsChunk {
3023                        previous: Some(ChunkIdentifier::new(42)),
3024                        new: ChunkIdentifier::new(1),
3025                        next: None,
3026                    },
3027                    Update::PushItems {
3028                        at: Position::new(ChunkIdentifier::new(1), 0),
3029                        items: vec![ev1.clone()],
3030                    },
3031                    // And another items chunk, non-empty again.
3032                    Update::NewItemsChunk {
3033                        previous: Some(ChunkIdentifier::new(1)),
3034                        new: ChunkIdentifier::new(2),
3035                        next: None,
3036                    },
3037                    Update::PushItems {
3038                        at: Position::new(ChunkIdentifier::new(2), 0),
3039                        items: vec![ev2.clone()],
3040                    },
3041                ],
3042            )
3043            .await
3044            .unwrap();
3045
3046        let client = MockClientBuilder::new(None)
3047            .on_builder(|builder| {
3048                builder.store_config(
3049                    StoreConfig::new("hodlor".to_owned())
3050                        .event_cache_store(event_cache_store.clone()),
3051                )
3052            })
3053            .build()
3054            .await;
3055
3056        let event_cache = client.event_cache();
3057
3058        // Don't forget to subscribe and like.
3059        event_cache.subscribe().unwrap();
3060
3061        // Let's check whether the generic updates are received for the initialisation.
3062        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3063
3064        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3065        let room = client.get_room(room_id).unwrap();
3066
3067        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3068
3069        // The room event cache has been loaded. A generic update must have been
3070        // triggered.
3071        assert_matches!(
3072            generic_stream.recv().await,
3073            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3074                assert_eq!(room_id, expected_room_id);
3075            }
3076        );
3077
3078        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3079
3080        // The initial items contain one event because only the last chunk is loaded by
3081        // default.
3082        assert_eq!(items.len(), 1);
3083        assert_eq!(items[0].event_id().unwrap(), event_id2);
3084        assert!(stream.is_empty());
3085
3086        // The event cache knows only all events though, even if they aren't loaded.
3087        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3088        assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3089
3090        // Let's paginate to load more events.
3091        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3092
3093        assert_let_timeout!(
3094            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3095        );
3096        assert_eq!(diffs.len(), 1);
3097        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3098            assert_eq!(event.event_id().unwrap(), event_id1);
3099        });
3100
3101        assert!(stream.is_empty());
3102
3103        // A generic update is triggered too.
3104        assert_matches!(
3105            generic_stream.recv().await,
3106            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3107                assert_eq!(expected_room_id, room_id);
3108            }
3109        );
3110
3111        // A new update with one of these events leads to deduplication.
3112        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
3113
3114        room_event_cache
3115            .inner
3116            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
3117            .await
3118            .unwrap();
3119
3120        // Just checking the generic update is correct. There is a duplicate event, so
3121        // no generic changes whatsoever!
3122        assert!(generic_stream.recv().now_or_never().is_none());
3123
3124        // The stream doesn't report these changes *yet*. Use the items vector given
3125        // when subscribing, to check that the items correspond to their new
3126        // positions. The duplicated item is removed (so it's not the first
3127        // element anymore), and it's added to the back of the list.
3128        let items = room_event_cache.events().await.unwrap();
3129        assert_eq!(items.len(), 2);
3130        assert_eq!(items[0].event_id().unwrap(), event_id1);
3131        assert_eq!(items[1].event_id().unwrap(), event_id2);
3132    }
3133
3134    #[async_test]
3135    async fn test_load_from_storage_resilient_to_failure() {
3136        let room_id = room_id!("!fondue:patate.ch");
3137        let event_cache_store = Arc::new(MemoryStore::new());
3138
3139        let event = EventFactory::new()
3140            .room(room_id)
3141            .sender(user_id!("@ben:saucisse.bzh"))
3142            .text_msg("foo")
3143            .event_id(event_id!("$42"))
3144            .into_event();
3145
3146        // Prefill the store with invalid data: two chunks that form a cycle.
3147        event_cache_store
3148            .handle_linked_chunk_updates(
3149                LinkedChunkId::Room(room_id),
3150                vec![
3151                    Update::NewItemsChunk {
3152                        previous: None,
3153                        new: ChunkIdentifier::new(0),
3154                        next: None,
3155                    },
3156                    Update::PushItems {
3157                        at: Position::new(ChunkIdentifier::new(0), 0),
3158                        items: vec![event],
3159                    },
3160                    Update::NewItemsChunk {
3161                        previous: Some(ChunkIdentifier::new(0)),
3162                        new: ChunkIdentifier::new(1),
3163                        next: Some(ChunkIdentifier::new(0)),
3164                    },
3165                ],
3166            )
3167            .await
3168            .unwrap();
3169
3170        let client = MockClientBuilder::new(None)
3171            .on_builder(|builder| {
3172                builder.store_config(
3173                    StoreConfig::new("holder".to_owned())
3174                        .event_cache_store(event_cache_store.clone()),
3175                )
3176            })
3177            .build()
3178            .await;
3179
3180        let event_cache = client.event_cache();
3181
3182        // Don't forget to subscribe and like.
3183        event_cache.subscribe().unwrap();
3184
3185        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3186        let room = client.get_room(room_id).unwrap();
3187
3188        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3189
3190        let items = room_event_cache.events().await.unwrap();
3191
3192        // Because the persisted content was invalid, the room store is reset: there are
3193        // no events in the cache.
3194        assert!(items.is_empty());
3195
3196        // Storage doesn't contain anything. It would also be valid that it contains a
3197        // single initial empty items chunk.
3198        let raw_chunks =
3199            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
3200        assert!(raw_chunks.is_empty());
3201    }
3202
3203    #[async_test]
3204    async fn test_no_useless_gaps() {
3205        let room_id = room_id!("!galette:saucisse.bzh");
3206
3207        let client = MockClientBuilder::new(None).build().await;
3208
3209        let event_cache = client.event_cache();
3210        event_cache.subscribe().unwrap();
3211
3212        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3213        let room = client.get_room(room_id).unwrap();
3214        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3215        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3216
3217        let f = EventFactory::new().room(room_id).sender(*ALICE);
3218
3219        // Propagate an update including a limited timeline with one message and a
3220        // prev-batch token.
3221        room_event_cache
3222            .inner
3223            .handle_joined_room_update(JoinedRoomUpdate {
3224                timeline: Timeline {
3225                    limited: true,
3226                    prev_batch: Some("raclette".to_owned()),
3227                    events: vec![f.text_msg("hey yo").into_event()],
3228                },
3229                ..Default::default()
3230            })
3231            .await
3232            .unwrap();
3233
3234        // Just checking the generic update is correct.
3235        assert_matches!(
3236            generic_stream.recv().await,
3237            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3238                assert_eq!(expected_room_id, room_id);
3239            }
3240        );
3241
3242        {
3243            let mut state = room_event_cache.inner.state.write().await.unwrap();
3244
3245            let mut num_gaps = 0;
3246            let mut num_events = 0;
3247
3248            for c in state.room_linked_chunk().chunks() {
3249                match c.content() {
3250                    ChunkContent::Items(items) => num_events += items.len(),
3251                    ChunkContent::Gap(_) => num_gaps += 1,
3252                }
3253            }
3254
3255            // The limited sync unloads the chunk, so it will appear as if there are only
3256            // the events.
3257            assert_eq!(num_gaps, 0);
3258            assert_eq!(num_events, 1);
3259
3260            // But if I manually reload more of the chunk, the gap will be present.
3261            assert_matches!(
3262                state.load_more_events_backwards().await.unwrap(),
3263                LoadMoreEventsBackwardsOutcome::Gap { .. }
3264            );
3265
3266            num_gaps = 0;
3267            num_events = 0;
3268            for c in state.room_linked_chunk().chunks() {
3269                match c.content() {
3270                    ChunkContent::Items(items) => num_events += items.len(),
3271                    ChunkContent::Gap(_) => num_gaps += 1,
3272                }
3273            }
3274
3275            // The gap must have been stored.
3276            assert_eq!(num_gaps, 1);
3277            assert_eq!(num_events, 1);
3278        }
3279
3280        // Now, propagate an update for another message, but the timeline isn't limited
3281        // this time.
3282        room_event_cache
3283            .inner
3284            .handle_joined_room_update(JoinedRoomUpdate {
3285                timeline: Timeline {
3286                    limited: false,
3287                    prev_batch: Some("fondue".to_owned()),
3288                    events: vec![f.text_msg("sup").into_event()],
3289                },
3290                ..Default::default()
3291            })
3292            .await
3293            .unwrap();
3294
3295        // Just checking the generic update is correct.
3296        assert_matches!(
3297            generic_stream.recv().await,
3298            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3299                assert_eq!(expected_room_id, room_id);
3300            }
3301        );
3302
3303        {
3304            let state = room_event_cache.inner.state.read().await.unwrap();
3305
3306            let mut num_gaps = 0;
3307            let mut num_events = 0;
3308
3309            for c in state.room_linked_chunk().chunks() {
3310                match c.content() {
3311                    ChunkContent::Items(items) => num_events += items.len(),
3312                    ChunkContent::Gap(gap) => {
3313                        assert_eq!(gap.prev_token, "raclette");
3314                        num_gaps += 1;
3315                    }
3316                }
3317            }
3318
3319            // There's only the previous gap, no new ones.
3320            assert_eq!(num_gaps, 1);
3321            assert_eq!(num_events, 2);
3322        }
3323    }
3324
3325    #[async_test]
3326    async fn test_shrink_to_last_chunk() {
3327        let room_id = room_id!("!galette:saucisse.bzh");
3328
3329        let client = MockClientBuilder::new(None).build().await;
3330
3331        let f = EventFactory::new().room(room_id);
3332
3333        let evid1 = event_id!("$1");
3334        let evid2 = event_id!("$2");
3335
3336        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3337        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3338
3339        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3340        {
3341            client
3342                .event_cache_store()
3343                .lock()
3344                .await
3345                .expect("Could not acquire the event cache lock")
3346                .as_clean()
3347                .expect("Could not acquire a clean event cache lock")
3348                .handle_linked_chunk_updates(
3349                    LinkedChunkId::Room(room_id),
3350                    vec![
3351                        Update::NewItemsChunk {
3352                            previous: None,
3353                            new: ChunkIdentifier::new(0),
3354                            next: None,
3355                        },
3356                        Update::PushItems {
3357                            at: Position::new(ChunkIdentifier::new(0), 0),
3358                            items: vec![ev1],
3359                        },
3360                        Update::NewItemsChunk {
3361                            previous: Some(ChunkIdentifier::new(0)),
3362                            new: ChunkIdentifier::new(1),
3363                            next: None,
3364                        },
3365                        Update::PushItems {
3366                            at: Position::new(ChunkIdentifier::new(1), 0),
3367                            items: vec![ev2],
3368                        },
3369                    ],
3370                )
3371                .await
3372                .unwrap();
3373        }
3374
3375        let event_cache = client.event_cache();
3376        event_cache.subscribe().unwrap();
3377
3378        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3379        let room = client.get_room(room_id).unwrap();
3380        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3381
3382        // Sanity check: lazily loaded, so only includes one item at start.
3383        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
3384        assert_eq!(events.len(), 1);
3385        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3386        assert!(stream.is_empty());
3387
3388        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3389
3390        // Force loading the full linked chunk by back-paginating.
3391        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3392        assert_eq!(outcome.events.len(), 1);
3393        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3394        assert!(outcome.reached_start);
3395
3396        // We also get an update about the loading from the store.
3397        assert_let_timeout!(
3398            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3399        );
3400        assert_eq!(diffs.len(), 1);
3401        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3402            assert_eq!(value.event_id().as_deref(), Some(evid1));
3403        });
3404
3405        assert!(stream.is_empty());
3406
3407        // Same for the generic update.
3408        assert_let_timeout!(
3409            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3410        );
3411        assert_eq!(received_room_id, room_id);
3412
3413        // Shrink the linked chunk to the last chunk.
3414        let diffs = room_event_cache
3415            .inner
3416            .state
3417            .write()
3418            .await
3419            .unwrap()
3420            .force_shrink_to_last_chunk()
3421            .await
3422            .expect("shrinking should succeed");
3423
3424        // We receive updates about the changes to the linked chunk.
3425        assert_eq!(diffs.len(), 2);
3426        assert_matches!(&diffs[0], VectorDiff::Clear);
3427        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3428            assert_eq!(values.len(), 1);
3429            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3430        });
3431
3432        assert!(stream.is_empty());
3433
3434        // No generic update is sent in this case.
3435        assert!(generic_stream.is_empty());
3436
3437        // When reading the events, we do get only the last one.
3438        let events = room_event_cache.events().await.unwrap();
3439        assert_eq!(events.len(), 1);
3440        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3441
3442        // But if we back-paginate, we don't need access to network to find out about
3443        // the previous event.
3444        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3445        assert_eq!(outcome.events.len(), 1);
3446        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3447        assert!(outcome.reached_start);
3448    }
3449
3450    #[async_test]
3451    async fn test_room_ordering() {
3452        let room_id = room_id!("!galette:saucisse.bzh");
3453
3454        let client = MockClientBuilder::new(None).build().await;
3455
3456        let f = EventFactory::new().room(room_id).sender(*ALICE);
3457
3458        let evid1 = event_id!("$1");
3459        let evid2 = event_id!("$2");
3460        let evid3 = event_id!("$3");
3461
3462        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3463        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3464        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3465
3466        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3467        {
3468            client
3469                .event_cache_store()
3470                .lock()
3471                .await
3472                .expect("Could not acquire the event cache lock")
3473                .as_clean()
3474                .expect("Could not acquire a clean event cache lock")
3475                .handle_linked_chunk_updates(
3476                    LinkedChunkId::Room(room_id),
3477                    vec![
3478                        Update::NewItemsChunk {
3479                            previous: None,
3480                            new: ChunkIdentifier::new(0),
3481                            next: None,
3482                        },
3483                        Update::PushItems {
3484                            at: Position::new(ChunkIdentifier::new(0), 0),
3485                            items: vec![ev1, ev2],
3486                        },
3487                        Update::NewItemsChunk {
3488                            previous: Some(ChunkIdentifier::new(0)),
3489                            new: ChunkIdentifier::new(1),
3490                            next: None,
3491                        },
3492                        Update::PushItems {
3493                            at: Position::new(ChunkIdentifier::new(1), 0),
3494                            items: vec![ev3.clone()],
3495                        },
3496                    ],
3497                )
3498                .await
3499                .unwrap();
3500        }
3501
3502        let event_cache = client.event_cache();
3503        event_cache.subscribe().unwrap();
3504
3505        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3506        let room = client.get_room(room_id).unwrap();
3507        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3508
3509        // Initially, the linked chunk only contains the last chunk, so only ev3 is
3510        // loaded.
3511        {
3512            let state = room_event_cache.inner.state.read().await.unwrap();
3513            let room_linked_chunk = state.room_linked_chunk();
3514
3515            // But we can get the order of ev1.
3516            assert_eq!(
3517                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3518                Some(0)
3519            );
3520
3521            // And that of ev2 as well.
3522            assert_eq!(
3523                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3524                Some(1)
3525            );
3526
3527            // ev3, which is loaded, also has a known ordering.
3528            let mut events = room_linked_chunk.events();
3529            let (pos, ev) = events.next().unwrap();
3530            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3531            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3532            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3533
3534            // No other loaded events.
3535            assert!(events.next().is_none());
3536        }
3537
3538        // Force loading the full linked chunk by back-paginating.
3539        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3540        assert!(outcome.reached_start);
3541
3542        // All events are now loaded, so their order is precisely their enumerated index
3543        // in a linear iteration.
3544        {
3545            let state = room_event_cache.inner.state.read().await.unwrap();
3546            let room_linked_chunk = state.room_linked_chunk();
3547
3548            for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
3549                assert_eq!(room_linked_chunk.event_order(pos), Some(i));
3550            }
3551        }
3552
3553        // Handle a gappy sync with two events (including one duplicate, so
3554        // deduplication kicks in), so that the linked chunk is shrunk to the
3555        // last chunk, and that the linked chunk only contains the last two
3556        // events.
3557        let evid4 = event_id!("$4");
3558        room_event_cache
3559            .inner
3560            .handle_joined_room_update(JoinedRoomUpdate {
3561                timeline: Timeline {
3562                    limited: true,
3563                    prev_batch: Some("fondue".to_owned()),
3564                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3565                },
3566                ..Default::default()
3567            })
3568            .await
3569            .unwrap();
3570
3571        {
3572            let state = room_event_cache.inner.state.read().await.unwrap();
3573            let room_linked_chunk = state.room_linked_chunk();
3574
3575            // After the shrink, only evid3 and evid4 are loaded.
3576            let mut events = room_linked_chunk.events();
3577
3578            let (pos, ev) = events.next().unwrap();
3579            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3580            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3581
3582            let (pos, ev) = events.next().unwrap();
3583            assert_eq!(ev.event_id().as_deref(), Some(evid4));
3584            assert_eq!(room_linked_chunk.event_order(pos), Some(3));
3585
3586            // No other loaded events.
3587            assert!(events.next().is_none());
3588
3589            // But we can still get the order of previous events.
3590            assert_eq!(
3591                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3592                Some(0)
3593            );
3594            assert_eq!(
3595                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3596                Some(1)
3597            );
3598
3599            // ev3 doesn't have an order with its previous position, since it's been
3600            // deduplicated.
3601            assert_eq!(
3602                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
3603                None
3604            );
3605        }
3606    }
3607
3608    #[async_test]
3609    async fn test_auto_shrink_after_all_subscribers_are_gone() {
3610        let room_id = room_id!("!galette:saucisse.bzh");
3611
3612        let client = MockClientBuilder::new(None).build().await;
3613
3614        let f = EventFactory::new().room(room_id);
3615
3616        let evid1 = event_id!("$1");
3617        let evid2 = event_id!("$2");
3618
3619        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3620        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3621
3622        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3623        {
3624            client
3625                .event_cache_store()
3626                .lock()
3627                .await
3628                .expect("Could not acquire the event cache lock")
3629                .as_clean()
3630                .expect("Could not acquire a clean event cache lock")
3631                .handle_linked_chunk_updates(
3632                    LinkedChunkId::Room(room_id),
3633                    vec![
3634                        Update::NewItemsChunk {
3635                            previous: None,
3636                            new: ChunkIdentifier::new(0),
3637                            next: None,
3638                        },
3639                        Update::PushItems {
3640                            at: Position::new(ChunkIdentifier::new(0), 0),
3641                            items: vec![ev1],
3642                        },
3643                        Update::NewItemsChunk {
3644                            previous: Some(ChunkIdentifier::new(0)),
3645                            new: ChunkIdentifier::new(1),
3646                            next: None,
3647                        },
3648                        Update::PushItems {
3649                            at: Position::new(ChunkIdentifier::new(1), 0),
3650                            items: vec![ev2],
3651                        },
3652                    ],
3653                )
3654                .await
3655                .unwrap();
3656        }
3657
3658        let event_cache = client.event_cache();
3659        event_cache.subscribe().unwrap();
3660
3661        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3662        let room = client.get_room(room_id).unwrap();
3663        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3664
3665        // Sanity check: lazily loaded, so only includes one item at start.
3666        let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
3667        assert_eq!(events1.len(), 1);
3668        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3669        assert!(stream1.is_empty());
3670
3671        // Force loading the full linked chunk by back-paginating.
3672        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3673        assert_eq!(outcome.events.len(), 1);
3674        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3675        assert!(outcome.reached_start);
3676
3677        // We also get an update about the loading from the store. Ignore it, for this
3678        // test's sake.
3679        assert_let_timeout!(
3680            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3681        );
3682        assert_eq!(diffs.len(), 1);
3683        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3684            assert_eq!(value.event_id().as_deref(), Some(evid1));
3685        });
3686
3687        assert!(stream1.is_empty());
3688
3689        // Have another subscriber.
3690        // Since it's not the first one, and the previous one loaded some more events,
3691        // the second subscribers sees them all.
3692        let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
3693        assert_eq!(events2.len(), 2);
3694        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3695        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3696        assert!(stream2.is_empty());
3697
3698        // Drop the first stream, and wait a bit.
3699        drop(stream1);
3700        yield_now().await;
3701
3702        // The second stream remains undisturbed.
3703        assert!(stream2.is_empty());
3704
3705        // Now drop the second stream, and wait a bit.
3706        drop(stream2);
3707        yield_now().await;
3708
3709        // The linked chunk must have auto-shrunk by now.
3710
3711        {
3712            // Check the inner state: there's no more shared auto-shrinker.
3713            let state = room_event_cache.inner.state.read().await.unwrap();
3714            assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
3715        }
3716
3717        // Getting the events will only give us the latest chunk.
3718        let events3 = room_event_cache.events().await.unwrap();
3719        assert_eq!(events3.len(), 1);
3720        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3721    }
3722
3723    #[async_test]
3724    async fn test_rfind_map_event_in_memory_by() {
3725        let user_id = user_id!("@mnt_io:matrix.org");
3726        let room_id = room_id!("!raclette:patate.ch");
3727        let client = MockClientBuilder::new(None).build().await;
3728
3729        let event_factory = EventFactory::new().room(room_id);
3730
3731        let event_id_0 = event_id!("$ev0");
3732        let event_id_1 = event_id!("$ev1");
3733        let event_id_2 = event_id!("$ev2");
3734        let event_id_3 = event_id!("$ev3");
3735
3736        let event_0 =
3737            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3738        let event_1 =
3739            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3740        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3741        let event_3 =
3742            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3743
3744        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
3745        // events.
3746        {
3747            client
3748                .event_cache_store()
3749                .lock()
3750                .await
3751                .expect("Could not acquire the event cache lock")
3752                .as_clean()
3753                .expect("Could not acquire a clean event cache lock")
3754                .handle_linked_chunk_updates(
3755                    LinkedChunkId::Room(room_id),
3756                    vec![
3757                        Update::NewItemsChunk {
3758                            previous: None,
3759                            new: ChunkIdentifier::new(0),
3760                            next: None,
3761                        },
3762                        Update::PushItems {
3763                            at: Position::new(ChunkIdentifier::new(0), 0),
3764                            items: vec![event_3],
3765                        },
3766                        Update::NewItemsChunk {
3767                            previous: Some(ChunkIdentifier::new(0)),
3768                            new: ChunkIdentifier::new(1),
3769                            next: None,
3770                        },
3771                        Update::PushItems {
3772                            at: Position::new(ChunkIdentifier::new(1), 0),
3773                            items: vec![event_0, event_1, event_2],
3774                        },
3775                    ],
3776                )
3777                .await
3778                .unwrap();
3779        }
3780
3781        let event_cache = client.event_cache();
3782        event_cache.subscribe().unwrap();
3783
3784        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3785        let room = client.get_room(room_id).unwrap();
3786        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3787
3788        // Look for an event from `BOB`: it must be `event_0`.
3789        assert_matches!(
3790            room_event_cache
3791                .rfind_map_event_in_memory_by(|event| {
3792                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id())
3793                })
3794                .await,
3795            Ok(Some(event_id)) => {
3796                assert_eq!(event_id.as_deref(), Some(event_id_0));
3797            }
3798        );
3799
3800        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
3801        // because events are looked for in reverse order.
3802        assert_matches!(
3803            room_event_cache
3804                .rfind_map_event_in_memory_by(|event| {
3805                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id())
3806                })
3807                .await,
3808            Ok(Some(event_id)) => {
3809                assert_eq!(event_id.as_deref(), Some(event_id_2));
3810            }
3811        );
3812
3813        // Look for an event that is inside the storage, but not loaded.
3814        assert!(
3815            room_event_cache
3816                .rfind_map_event_in_memory_by(|event| {
3817                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3818                        == Some(user_id))
3819                    .then(|| event.event_id())
3820                })
3821                .await
3822                .unwrap()
3823                .is_none()
3824        );
3825
3826        // Look for an event that doesn't exist.
3827        assert!(
3828            room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
3829        );
3830    }
3831
3832    #[async_test]
3833    async fn test_reload_when_dirty() {
3834        let user_id = user_id!("@mnt_io:matrix.org");
3835        let room_id = room_id!("!raclette:patate.ch");
3836
3837        // The storage shared by the two clients.
3838        let event_cache_store = MemoryStore::new();
3839
3840        // Client for the process 0.
3841        let client_p0 = MockClientBuilder::new(None)
3842            .on_builder(|builder| {
3843                builder.store_config(
3844                    StoreConfig::new("process #0".to_owned())
3845                        .event_cache_store(event_cache_store.clone()),
3846                )
3847            })
3848            .build()
3849            .await;
3850
3851        // Client for the process 1.
3852        let client_p1 = MockClientBuilder::new(None)
3853            .on_builder(|builder| {
3854                builder.store_config(
3855                    StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
3856                )
3857            })
3858            .build()
3859            .await;
3860
3861        let event_factory = EventFactory::new().room(room_id).sender(user_id);
3862
3863        let ev_id_0 = event_id!("$ev_0");
3864        let ev_id_1 = event_id!("$ev_1");
3865
3866        let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
3867        let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
3868
3869        // Add events to the storage (shared by the two clients!).
3870        client_p0
3871            .event_cache_store()
3872            .lock()
3873            .await
3874            .expect("[p0] Could not acquire the event cache lock")
3875            .as_clean()
3876            .expect("[p0] Could not acquire a clean event cache lock")
3877            .handle_linked_chunk_updates(
3878                LinkedChunkId::Room(room_id),
3879                vec![
3880                    Update::NewItemsChunk {
3881                        previous: None,
3882                        new: ChunkIdentifier::new(0),
3883                        next: None,
3884                    },
3885                    Update::PushItems {
3886                        at: Position::new(ChunkIdentifier::new(0), 0),
3887                        items: vec![ev_0],
3888                    },
3889                    Update::NewItemsChunk {
3890                        previous: Some(ChunkIdentifier::new(0)),
3891                        new: ChunkIdentifier::new(1),
3892                        next: None,
3893                    },
3894                    Update::PushItems {
3895                        at: Position::new(ChunkIdentifier::new(1), 0),
3896                        items: vec![ev_1],
3897                    },
3898                ],
3899            )
3900            .await
3901            .unwrap();
3902
3903        // Subscribe the event caches, and create the room.
3904        let (room_event_cache_p0, room_event_cache_p1) = {
3905            let event_cache_p0 = client_p0.event_cache();
3906            event_cache_p0.subscribe().unwrap();
3907
3908            let event_cache_p1 = client_p1.event_cache();
3909            event_cache_p1.subscribe().unwrap();
3910
3911            client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3912            client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3913
3914            let (room_event_cache_p0, _drop_handles) =
3915                client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
3916            let (room_event_cache_p1, _drop_handles) =
3917                client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
3918
3919            (room_event_cache_p0, room_event_cache_p1)
3920        };
3921
3922        // Okay. We are ready for the test!
3923        //
3924        // First off, let's check `room_event_cache_p0` has access to the first event
3925        // loaded in-memory, then do a pagination, and see more events.
3926        let mut updates_stream_p0 = {
3927            let room_event_cache = &room_event_cache_p0;
3928
3929            let (initial_updates, mut updates_stream) =
3930                room_event_cache_p0.subscribe().await.unwrap();
3931
3932            // Initial updates contain `ev_id_1` only.
3933            assert_eq!(initial_updates.len(), 1);
3934            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
3935            assert!(updates_stream.is_empty());
3936
3937            // `ev_id_1` must be loaded in memory.
3938            assert!(event_loaded(room_event_cache, ev_id_1).await);
3939
3940            // `ev_id_0` must NOT be loaded in memory.
3941            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
3942
3943            // Load one more event with a backpagination.
3944            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
3945
3946            // A new update for `ev_id_0` must be present.
3947            assert_matches!(
3948                updates_stream.recv().await.unwrap(),
3949                RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
3950                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
3951                    assert_matches!(
3952                        &diffs[0],
3953                        VectorDiff::Insert { index: 0, value: event } => {
3954                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
3955                        }
3956                    );
3957                }
3958            );
3959
3960            // `ev_id_0` must now be loaded in memory.
3961            assert!(event_loaded(room_event_cache, ev_id_0).await);
3962
3963            updates_stream
3964        };
3965
3966        // Second, let's check `room_event_cache_p1` has the same accesses.
3967        let mut updates_stream_p1 = {
3968            let room_event_cache = &room_event_cache_p1;
3969            let (initial_updates, mut updates_stream) =
3970                room_event_cache_p1.subscribe().await.unwrap();
3971
3972            // Initial updates contain `ev_id_1` only.
3973            assert_eq!(initial_updates.len(), 1);
3974            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
3975            assert!(updates_stream.is_empty());
3976
3977            // `ev_id_1` must be loaded in memory.
3978            assert!(event_loaded(room_event_cache, ev_id_1).await);
3979
3980            // `ev_id_0` must NOT be loaded in memory.
3981            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
3982
3983            // Load one more event with a backpagination.
3984            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
3985
3986            // A new update for `ev_id_0` must be present.
3987            assert_matches!(
3988                updates_stream.recv().await.unwrap(),
3989                RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
3990                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
3991                    assert_matches!(
3992                        &diffs[0],
3993                        VectorDiff::Insert { index: 0, value: event } => {
3994                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
3995                        }
3996                    );
3997                }
3998            );
3999
4000            // `ev_id_0` must now be loaded in memory.
4001            assert!(event_loaded(room_event_cache, ev_id_0).await);
4002
4003            updates_stream
4004        };
4005
4006        // Do this a couple times, for the fun.
4007        for _ in 0..3 {
4008            // Third, because `room_event_cache_p1` has locked the store, the lock
4009            // is dirty for `room_event_cache_p0`, so it will shrink to its last
4010            // chunk!
4011            {
4012                let room_event_cache = &room_event_cache_p0;
4013                let updates_stream = &mut updates_stream_p0;
4014
4015                // `ev_id_1` must be loaded in memory, just like before.
4016                assert!(event_loaded(room_event_cache, ev_id_1).await);
4017
4018                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
4019                // state has been reloaded to its last chunk.
4020                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4021
4022                // The reload can be observed via the updates too.
4023                assert_matches!(
4024                    updates_stream.recv().await.unwrap(),
4025                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4026                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4027                        assert_matches!(&diffs[0], VectorDiff::Clear);
4028                        assert_matches!(
4029                            &diffs[1],
4030                            VectorDiff::Append { values: events } => {
4031                                assert_eq!(events.len(), 1);
4032                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4033                            }
4034                        );
4035                    }
4036                );
4037
4038                // Load one more event with a backpagination.
4039                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4040
4041                // `ev_id_0` must now be loaded in memory.
4042                assert!(event_loaded(room_event_cache, ev_id_0).await);
4043
4044                // The pagination can be observed via the updates too.
4045                assert_matches!(
4046                    updates_stream.recv().await.unwrap(),
4047                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4048                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4049                        assert_matches!(
4050                            &diffs[0],
4051                            VectorDiff::Insert { index: 0, value: event } => {
4052                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4053                            }
4054                        );
4055                    }
4056                );
4057            }
4058
4059            // Fourth, because `room_event_cache_p0` has locked the store again, the lock
4060            // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
4061            // chunk!
4062            {
4063                let room_event_cache = &room_event_cache_p1;
4064                let updates_stream = &mut updates_stream_p1;
4065
4066                // `ev_id_1` must be loaded in memory, just like before.
4067                assert!(event_loaded(room_event_cache, ev_id_1).await);
4068
4069                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
4070                // state has shrunk to its last chunk.
4071                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4072
4073                // The reload can be observed via the updates too.
4074                assert_matches!(
4075                    updates_stream.recv().await.unwrap(),
4076                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4077                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4078                        assert_matches!(&diffs[0], VectorDiff::Clear);
4079                        assert_matches!(
4080                            &diffs[1],
4081                            VectorDiff::Append { values: events } => {
4082                                assert_eq!(events.len(), 1);
4083                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4084                            }
4085                        );
4086                    }
4087                );
4088
4089                // Load one more event with a backpagination.
4090                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4091
4092                // `ev_id_0` must now be loaded in memory.
4093                assert!(event_loaded(room_event_cache, ev_id_0).await);
4094
4095                // The pagination can be observed via the updates too.
4096                assert_matches!(
4097                    updates_stream.recv().await.unwrap(),
4098                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4099                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4100                        assert_matches!(
4101                            &diffs[0],
4102                            VectorDiff::Insert { index: 0, value: event } => {
4103                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4104                            }
4105                        );
4106                    }
4107                );
4108            }
4109        }
4110
4111        // Repeat that with an explicit read lock (so that we don't rely on
4112        // `event_loaded` to trigger the dirty detection).
4113        for _ in 0..3 {
4114            {
4115                let room_event_cache = &room_event_cache_p0;
4116                let updates_stream = &mut updates_stream_p0;
4117
4118                let guard = room_event_cache.inner.state.read().await.unwrap();
4119
4120                // Guard is kept alive, to ensure we can have multiple read guards alive with a
4121                // shared access.
4122                // See `RoomEventCacheStateLock::read` to learn more.
4123
4124                // The lock is no longer marked as dirty, it's been cleaned.
4125                assert!(guard.is_dirty().not());
4126
4127                // The reload can be observed via the updates too.
4128                assert_matches!(
4129                    updates_stream.recv().await.unwrap(),
4130                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4131                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4132                        assert_matches!(&diffs[0], VectorDiff::Clear);
4133                        assert_matches!(
4134                            &diffs[1],
4135                            VectorDiff::Append { values: events } => {
4136                                assert_eq!(events.len(), 1);
4137                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4138                            }
4139                        );
4140                    }
4141                );
4142
4143                assert!(event_loaded(room_event_cache, ev_id_1).await);
4144                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4145
4146                // Ensure `guard` is alive up to this point (in case this test is refactored, I
4147                // want to make this super explicit).
4148                //
4149                // We drop need to drop it before the pagination because the pagination needs to
4150                // obtain a write lock.
4151                drop(guard);
4152
4153                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4154                assert!(event_loaded(room_event_cache, ev_id_0).await);
4155
4156                // The pagination can be observed via the updates too.
4157                assert_matches!(
4158                    updates_stream.recv().await.unwrap(),
4159                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4160                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4161                        assert_matches!(
4162                            &diffs[0],
4163                            VectorDiff::Insert { index: 0, value: event } => {
4164                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4165                            }
4166                        );
4167                    }
4168                );
4169            }
4170
4171            {
4172                let room_event_cache = &room_event_cache_p1;
4173                let updates_stream = &mut updates_stream_p1;
4174
4175                let guard = room_event_cache.inner.state.read().await.unwrap();
4176
4177                // Guard is kept alive, to ensure we can have multiple read guards alive with a
4178                // shared access.
4179
4180                // The lock is no longer marked as dirty, it's been cleaned.
4181                assert!(guard.is_dirty().not());
4182
4183                // The reload can be observed via the updates too.
4184                assert_matches!(
4185                    updates_stream.recv().await.unwrap(),
4186                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4187                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4188                        assert_matches!(&diffs[0], VectorDiff::Clear);
4189                        assert_matches!(
4190                            &diffs[1],
4191                            VectorDiff::Append { values: events } => {
4192                                assert_eq!(events.len(), 1);
4193                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4194                            }
4195                        );
4196                    }
4197                );
4198
4199                assert!(event_loaded(room_event_cache, ev_id_1).await);
4200                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4201
4202                // Ensure `guard` is alive up to this point (in case this test is refactored, I
4203                // want to make this super explicit).
4204                //
4205                // We drop need to drop it before the pagination because the pagination needs to
4206                // obtain a write lock.
4207                drop(guard);
4208
4209                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4210                assert!(event_loaded(room_event_cache, ev_id_0).await);
4211
4212                // The pagination can be observed via the updates too.
4213                assert_matches!(
4214                    updates_stream.recv().await.unwrap(),
4215                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4216                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4217                        assert_matches!(
4218                            &diffs[0],
4219                            VectorDiff::Insert { index: 0, value: event } => {
4220                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4221                            }
4222                        );
4223                    }
4224                );
4225            }
4226        }
4227
4228        // Repeat that with an explicit write lock.
4229        for _ in 0..3 {
4230            {
4231                let room_event_cache = &room_event_cache_p0;
4232                let updates_stream = &mut updates_stream_p0;
4233
4234                let guard = room_event_cache.inner.state.write().await.unwrap();
4235
4236                // The lock is no longer marked as dirty, it's been cleaned.
4237                assert!(guard.is_dirty().not());
4238
4239                // The reload can be observed via the updates too.
4240                assert_matches!(
4241                    updates_stream.recv().await.unwrap(),
4242                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4243                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4244                        assert_matches!(&diffs[0], VectorDiff::Clear);
4245                        assert_matches!(
4246                            &diffs[1],
4247                            VectorDiff::Append { values: events } => {
4248                                assert_eq!(events.len(), 1);
4249                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4250                            }
4251                        );
4252                    }
4253                );
4254
4255                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
4256                // needs to obtain a read lock.
4257                drop(guard);
4258
4259                assert!(event_loaded(room_event_cache, ev_id_1).await);
4260                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4261
4262                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4263                assert!(event_loaded(room_event_cache, ev_id_0).await);
4264
4265                // The pagination can be observed via the updates too.
4266                assert_matches!(
4267                    updates_stream.recv().await.unwrap(),
4268                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4269                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4270                        assert_matches!(
4271                            &diffs[0],
4272                            VectorDiff::Insert { index: 0, value: event } => {
4273                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4274                            }
4275                        );
4276                    }
4277                );
4278            }
4279
4280            {
4281                let room_event_cache = &room_event_cache_p1;
4282                let updates_stream = &mut updates_stream_p1;
4283
4284                let guard = room_event_cache.inner.state.write().await.unwrap();
4285
4286                // The lock is no longer marked as dirty, it's been cleaned.
4287                assert!(guard.is_dirty().not());
4288
4289                // The reload can be observed via the updates too.
4290                assert_matches!(
4291                    updates_stream.recv().await.unwrap(),
4292                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4293                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4294                        assert_matches!(&diffs[0], VectorDiff::Clear);
4295                        assert_matches!(
4296                            &diffs[1],
4297                            VectorDiff::Append { values: events } => {
4298                                assert_eq!(events.len(), 1);
4299                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4300                            }
4301                        );
4302                    }
4303                );
4304
4305                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
4306                // needs to obtain a read lock.
4307                drop(guard);
4308
4309                assert!(event_loaded(room_event_cache, ev_id_1).await);
4310                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4311
4312                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4313                assert!(event_loaded(room_event_cache, ev_id_0).await);
4314
4315                // The pagination can be observed via the updates too.
4316                assert_matches!(
4317                    updates_stream.recv().await.unwrap(),
4318                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4319                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4320                        assert_matches!(
4321                            &diffs[0],
4322                            VectorDiff::Insert { index: 0, value: event } => {
4323                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4324                            }
4325                        );
4326                    }
4327                );
4328            }
4329        }
4330    }
4331
4332    #[async_test]
4333    async fn test_load_when_dirty() {
4334        let room_id_0 = room_id!("!raclette:patate.ch");
4335        let room_id_1 = room_id!("!morbiflette:patate.ch");
4336
4337        // The storage shared by the two clients.
4338        let event_cache_store = MemoryStore::new();
4339
4340        // Client for the process 0.
4341        let client_p0 = MockClientBuilder::new(None)
4342            .on_builder(|builder| {
4343                builder.store_config(
4344                    StoreConfig::new("process #0".to_owned())
4345                        .event_cache_store(event_cache_store.clone()),
4346                )
4347            })
4348            .build()
4349            .await;
4350
4351        // Client for the process 1.
4352        let client_p1 = MockClientBuilder::new(None)
4353            .on_builder(|builder| {
4354                builder.store_config(
4355                    StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4356                )
4357            })
4358            .build()
4359            .await;
4360
4361        // Subscribe the event caches, and create the room.
4362        let (room_event_cache_0_p0, room_event_cache_0_p1) = {
4363            let event_cache_p0 = client_p0.event_cache();
4364            event_cache_p0.subscribe().unwrap();
4365
4366            let event_cache_p1 = client_p1.event_cache();
4367            event_cache_p1.subscribe().unwrap();
4368
4369            client_p0
4370                .base_client()
4371                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4372            client_p0
4373                .base_client()
4374                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4375
4376            client_p1
4377                .base_client()
4378                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4379            client_p1
4380                .base_client()
4381                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4382
4383            let (room_event_cache_0_p0, _drop_handles) =
4384                client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4385            let (room_event_cache_0_p1, _drop_handles) =
4386                client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4387
4388            (room_event_cache_0_p0, room_event_cache_0_p1)
4389        };
4390
4391        // Let's make the cross-process lock over the store dirty.
4392        {
4393            drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
4394            drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
4395        }
4396
4397        // Create the `RoomEventCache` for `room_id_1`. During its creation, the
4398        // cross-process lock over the store MUST be dirty, which makes no difference as
4399        // a clean one: the state is just loaded, not reloaded.
4400        let (room_event_cache_1_p0, _) =
4401            client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
4402
4403        // Check the lock isn't dirty because it's been cleared.
4404        {
4405            let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
4406            assert!(guard.is_dirty().not());
4407        }
4408
4409        // The only way to test this behaviour is to see that the dirty block in
4410        // `RoomEventCacheStateLock` is covered by this test.
4411    }
4412
4413    async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
4414        room_event_cache
4415            .rfind_map_event_in_memory_by(|event| {
4416                (event.event_id().as_deref() == Some(event_id)).then_some(())
4417            })
4418            .await
4419            .unwrap()
4420            .is_some()
4421    }
4422}