matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{collections::BTreeMap, fmt, sync::Arc};
18
19use events::Gap;
20use eyeball_im::VectorDiff;
21use matrix_sdk_base::{
22    deserialized_responses::{AmbiguityChange, TimelineEvent},
23    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
24};
25use ruma::{
26    events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
27    serde::Raw,
28    EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
29};
30use tokio::sync::{
31    broadcast::{Receiver, Sender},
32    Notify, RwLock,
33};
34use tracing::{trace, warn};
35
36use super::{
37    paginator::{Paginator, PaginatorState},
38    AllEventsCache, EventsOrigin, Result, RoomEventCacheUpdate, RoomPagination,
39};
40use crate::{client::WeakClient, room::WeakRoom};
41
42pub(super) mod events;
43
44/// A subset of an event cache, for a room.
45///
46/// Cloning is shallow, and thus is cheap to do.
47#[derive(Clone)]
48pub struct RoomEventCache {
49    pub(super) inner: Arc<RoomEventCacheInner>,
50}
51
52impl fmt::Debug for RoomEventCache {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("RoomEventCache").finish_non_exhaustive()
55    }
56}
57
58impl RoomEventCache {
59    /// Create a new [`RoomEventCache`] using the given room and store.
60    pub(super) fn new(
61        client: WeakClient,
62        state: RoomEventCacheState,
63        room_id: OwnedRoomId,
64        room_version: RoomVersionId,
65        all_events_cache: Arc<RwLock<AllEventsCache>>,
66    ) -> Self {
67        Self {
68            inner: Arc::new(RoomEventCacheInner::new(
69                client,
70                state,
71                room_id,
72                room_version,
73                all_events_cache,
74            )),
75        }
76    }
77
78    /// Subscribe to this room updates, after getting the initial list of
79    /// events.
80    pub async fn subscribe(&self) -> (Vec<TimelineEvent>, Receiver<RoomEventCacheUpdate>) {
81        let state = self.inner.state.read().await;
82        let events = state.events().events().map(|(_position, item)| item.clone()).collect();
83
84        (events, self.inner.sender.subscribe())
85    }
86
87    /// Return a [`RoomPagination`] API object useful for running
88    /// back-pagination queries in the current room.
89    pub fn pagination(&self) -> RoomPagination {
90        RoomPagination { inner: self.inner.clone() }
91    }
92
93    /// Try to find an event by id in this room.
94    pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
95        if let Some((room_id, event)) =
96            self.inner.all_events.read().await.events.get(event_id).cloned()
97        {
98            if room_id == self.inner.room_id {
99                return Some(event);
100            }
101        }
102
103        let state = self.inner.state.read().await;
104        for (_pos, event) in state.events().revents() {
105            if event.event_id().as_deref() == Some(event_id) {
106                return Some(event.clone());
107            }
108        }
109        None
110    }
111
112    /// Try to find an event by id in this room, along with its related events.
113    ///
114    /// You can filter which types of related events to retrieve using
115    /// `filter`. `None` will retrieve related events of any type.
116    pub async fn event_with_relations(
117        &self,
118        event_id: &EventId,
119        filter: Option<Vec<RelationType>>,
120    ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
121        let cache = self.inner.all_events.read().await;
122        if let Some((_, event)) = cache.events.get(event_id) {
123            let related_events = cache.collect_related_events(event_id, filter.as_deref());
124            Some((event.clone(), related_events))
125        } else {
126            None
127        }
128    }
129
130    /// Clear all the storage for this [`RoomEventCache`].
131    ///
132    /// This will get rid of all the events from the linked chunk and persisted
133    /// storage.
134    pub async fn clear(&self) -> Result<()> {
135        // Clear the linked chunk and persisted storage.
136        let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
137
138        // Clear the (temporary) events mappings.
139        self.inner.all_events.write().await.clear();
140
141        // Reset the paginator.
142        // TODO: properly stop any ongoing back-pagination.
143        let _ = self.inner.paginator.set_idle_state(PaginatorState::Initial, None, None);
144
145        // Notify observers about the update.
146        let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
147            diffs: updates_as_vector_diffs,
148            origin: EventsOrigin::Sync,
149        });
150
151        Ok(())
152    }
153
154    /// Save a single event in the event cache, for further retrieval with
155    /// [`Self::event`].
156    // TODO: This doesn't insert the event into the linked chunk. In the future
157    // there'll be no distinction between the linked chunk and the separate
158    // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886.
159    pub(crate) async fn save_event(&self, event: TimelineEvent) {
160        if let Some(event_id) = event.event_id() {
161            let mut cache = self.inner.all_events.write().await;
162
163            cache.append_related_event(&event);
164            cache.events.insert(event_id, (self.inner.room_id.clone(), event));
165        } else {
166            warn!("couldn't save event without event id in the event cache");
167        }
168    }
169
170    /// Save some events in the event cache, for further retrieval with
171    /// [`Self::event`]. This function will save them using a single lock,
172    /// as opposed to [`Self::save_event`].
173    // TODO: This doesn't insert the event into the linked chunk. In the future
174    // there'll be no distinction between the linked chunk and the separate
175    // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886.
176    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
177        let mut cache = self.inner.all_events.write().await;
178        for event in events {
179            if let Some(event_id) = event.event_id() {
180                cache.append_related_event(&event);
181                cache.events.insert(event_id, (self.inner.room_id.clone(), event));
182            } else {
183                warn!("couldn't save event without event id in the event cache");
184            }
185        }
186    }
187
188    /// Return a nice debug string (a vector of lines) for the linked chunk of
189    /// events for this room.
190    pub async fn debug_string(&self) -> Vec<String> {
191        self.inner.state.read().await.events().debug_string()
192    }
193}
194
195/// The (non-cloneable) details of the `RoomEventCache`.
196pub(super) struct RoomEventCacheInner {
197    /// The room id for this room.
198    room_id: OwnedRoomId,
199
200    /// The room version for this room.
201    pub(crate) room_version: RoomVersionId,
202
203    /// Sender part for subscribers to this room.
204    pub sender: Sender<RoomEventCacheUpdate>,
205
206    /// State for this room's event cache.
207    pub state: RwLock<RoomEventCacheState>,
208
209    /// See comment of [`super::EventCacheInner::all_events`].
210    ///
211    /// This is shared between the [`super::EventCacheInner`] singleton and all
212    /// [`RoomEventCacheInner`] instances.
213    all_events: Arc<RwLock<AllEventsCache>>,
214
215    /// A notifier that we received a new pagination token.
216    pub pagination_batch_token_notifier: Notify,
217
218    /// A paginator instance, that's configured to run back-pagination on our
219    /// behalf.
220    ///
221    /// Note: forward-paginations are still run "out-of-band", that is,
222    /// disconnected from the event cache, as we don't implement matching
223    /// events received from those kinds of pagination with the cache. This
224    /// paginator is only used for queries that interact with the actual event
225    /// cache.
226    pub paginator: Paginator<WeakRoom>,
227}
228
229impl RoomEventCacheInner {
230    /// Creates a new cache for a room, and subscribes to room updates, so as
231    /// to handle new timeline events.
232    fn new(
233        client: WeakClient,
234        state: RoomEventCacheState,
235        room_id: OwnedRoomId,
236        room_version: RoomVersionId,
237        all_events_cache: Arc<RwLock<AllEventsCache>>,
238    ) -> Self {
239        let sender = Sender::new(32);
240        let weak_room = WeakRoom::new(client, room_id);
241        Self {
242            room_id: weak_room.room_id().to_owned(),
243            room_version,
244            state: RwLock::new(state),
245            all_events: all_events_cache,
246            sender,
247            pagination_batch_token_notifier: Default::default(),
248            paginator: Paginator::new(weak_room),
249        }
250    }
251
252    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
253        if account_data.is_empty() {
254            return;
255        }
256
257        let mut handled_read_marker = false;
258
259        trace!("Handling account data");
260
261        for raw_event in account_data {
262            match raw_event.deserialize() {
263                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
264                    // If duplicated, do not forward read marker multiple times
265                    // to avoid clutter the update channel.
266                    if handled_read_marker {
267                        continue;
268                    }
269
270                    handled_read_marker = true;
271
272                    // Propagate to observers. (We ignore the error if there aren't any.)
273                    let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
274                        event_id: ev.content.event_id,
275                    });
276                }
277
278                Ok(_) => {
279                    // We're not interested in other room account data updates,
280                    // at this point.
281                }
282
283                Err(e) => {
284                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
285                    warn!(event_type, "Failed to deserialize account data: {e}");
286                }
287            }
288        }
289    }
290
291    pub(super) async fn handle_joined_room_update(
292        &self,
293        has_storage: bool,
294        updates: JoinedRoomUpdate,
295    ) -> Result<()> {
296        self.handle_timeline(
297            has_storage,
298            updates.timeline,
299            updates.ephemeral.clone(),
300            updates.ambiguity_changes,
301        )
302        .await?;
303
304        self.handle_account_data(updates.account_data);
305
306        Ok(())
307    }
308
309    async fn handle_timeline(
310        &self,
311        has_storage: bool,
312        timeline: Timeline,
313        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
314        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
315    ) -> Result<()> {
316        if !has_storage && timeline.limited {
317            // Ideally we'd try to reconcile existing events against those received in the
318            // timeline, but we're not there yet. In the meanwhile, clear the
319            // items from the room. TODO: implement Smart Matching™.
320            trace!("limited timeline, clearing all previous events and pushing new events");
321
322            self.replace_all_events_by(
323                timeline.events,
324                timeline.prev_batch,
325                ephemeral_events,
326                ambiguity_changes,
327            )
328            .await?;
329        } else {
330            // Add all the events to the backend.
331            trace!("adding new events");
332
333            // If we have storage, only keep the previous-batch token if we have a limited
334            // timeline. Otherwise, we know about all the events, and we don't need to
335            // back-paginate, so we wouldn't make use of the given previous-batch token.
336            //
337            // If we don't have storage, even if the timeline isn't limited, we may not have
338            // saved the previous events in any cache, so we should always be
339            // able to retrieve those.
340            let prev_batch =
341                if has_storage && !timeline.limited { None } else { timeline.prev_batch };
342
343            let mut state = self.state.write().await;
344            self.append_events_locked(
345                &mut state,
346                timeline.events,
347                prev_batch,
348                ephemeral_events,
349                ambiguity_changes,
350            )
351            .await?;
352        }
353
354        Ok(())
355    }
356
357    pub(super) async fn handle_left_room_update(
358        &self,
359        has_storage: bool,
360        updates: LeftRoomUpdate,
361    ) -> Result<()> {
362        self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes)
363            .await?;
364        Ok(())
365    }
366
367    /// Remove existing events, and append a set of events to the room cache and
368    /// storage, notifying observers.
369    pub(super) async fn replace_all_events_by(
370        &self,
371        sync_timeline_events: Vec<TimelineEvent>,
372        prev_batch: Option<String>,
373        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
374        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
375    ) -> Result<()> {
376        // Acquire the lock.
377        let mut state = self.state.write().await;
378
379        // Reset the room's state.
380        let updates_as_vector_diffs = state.reset().await?;
381
382        // Propagate to observers.
383        let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
384            diffs: updates_as_vector_diffs,
385            origin: EventsOrigin::Sync,
386        });
387
388        // Push the new events.
389        self.append_events_locked(
390            &mut state,
391            sync_timeline_events,
392            prev_batch.clone(),
393            ephemeral_events,
394            ambiguity_changes,
395        )
396        .await?;
397
398        // Reset the paginator status to initial.
399        self.paginator.set_idle_state(PaginatorState::Initial, prev_batch, None)?;
400
401        Ok(())
402    }
403
404    /// Append a set of events to the room cache and storage, notifying
405    /// observers.
406    ///
407    /// This is a private implementation. It must not be exposed publicly.
408    async fn append_events_locked(
409        &self,
410        state: &mut RoomEventCacheState,
411        sync_timeline_events: Vec<TimelineEvent>,
412        prev_batch: Option<String>,
413        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
414        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
415    ) -> Result<()> {
416        if sync_timeline_events.is_empty()
417            && prev_batch.is_none()
418            && ephemeral_events.is_empty()
419            && ambiguity_changes.is_empty()
420        {
421            return Ok(());
422        }
423
424        let (events, duplicated_event_ids, all_duplicates) =
425            state.collect_valid_and_duplicated_events(sync_timeline_events.clone()).await?;
426
427        // During a sync, when a duplicated event is found, the old event is removed and
428        // the new event is added. This is the opposite strategy than during a backwards
429        // pagination where the old event is kept and the new event is ignored.
430        //
431        // Let's remove the old events that are duplicated.
432        let sync_timeline_events_diffs = if all_duplicates {
433            // No new events, thus no need to change the room events.
434            vec![]
435        } else {
436            // Add the previous back-pagination token (if present), followed by the timeline
437            // events themselves.
438            let (_, sync_timeline_events_diffs) = state
439                .with_events_mut(|room_events| {
440                    if let Some(prev_token) = &prev_batch {
441                        room_events.push_gap(Gap { prev_token: prev_token.clone() });
442                    }
443
444                    // Remove the old duplicated events.
445                    //
446                    // We don't have to worry the removals can change the position of the
447                    // existing events, because we are pushing all _new_
448                    // `events` at the back.
449                    room_events.remove_events_by_id(duplicated_event_ids);
450
451                    // Push the new events.
452                    room_events.push_events(events.clone());
453
454                    room_events.on_new_events(&self.room_version, events.iter());
455                })
456                .await?;
457
458            {
459                // Fill the AllEventsCache.
460                let mut all_events = self.all_events.write().await;
461                for sync_timeline_event in sync_timeline_events {
462                    if let Some(event_id) = sync_timeline_event.event_id() {
463                        all_events.append_related_event(&sync_timeline_event);
464                        all_events.events.insert(
465                            event_id.to_owned(),
466                            (self.room_id.clone(), sync_timeline_event),
467                        );
468                    }
469                }
470            }
471
472            sync_timeline_events_diffs
473        };
474
475        // Now that all events have been added, we can trigger the
476        // `pagination_token_notifier`.
477        if prev_batch.is_some() {
478            self.pagination_batch_token_notifier.notify_one();
479        }
480
481        // The order of `RoomEventCacheUpdate`s is **really** important here.
482        {
483            if !sync_timeline_events_diffs.is_empty() {
484                let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
485                    diffs: sync_timeline_events_diffs,
486                    origin: EventsOrigin::Sync,
487                });
488            }
489
490            if !ephemeral_events.is_empty() {
491                let _ = self
492                    .sender
493                    .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
494            }
495
496            if !ambiguity_changes.is_empty() {
497                let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
498            }
499        }
500
501        Ok(())
502    }
503}
504
505/// Internal type to represent the output of
506/// `RoomEventCacheState::load_more_events_backwards`.
507#[derive(Debug)]
508pub(super) enum LoadMoreEventsBackwardsOutcome {
509    /// A gap has been inserted.
510    Gap,
511
512    /// The start of the timeline has been reached.
513    StartOfTimeline,
514
515    /// Events have been inserted.
516    Events(Vec<TimelineEvent>, Vec<VectorDiff<TimelineEvent>>),
517}
518
519// Use a private module to hide `events` to this parent module.
520mod private {
521    use std::sync::Arc;
522
523    use eyeball_im::VectorDiff;
524    use matrix_sdk_base::{
525        deserialized_responses::{TimelineEvent, TimelineEventKind},
526        event_cache::{store::EventCacheStoreLock, Event},
527        linked_chunk::{lazy_loader, ChunkContent, Update},
528    };
529    use matrix_sdk_common::executor::spawn;
530    use once_cell::sync::OnceCell;
531    use ruma::{serde::Raw, OwnedEventId, OwnedRoomId};
532    use tracing::{error, instrument, trace};
533
534    use super::{events::RoomEvents, LoadMoreEventsBackwardsOutcome};
535    use crate::event_cache::{deduplicator::Deduplicator, EventCacheError};
536
537    /// State for a single room's event cache.
538    ///
539    /// This contains all the inner mutable states that ought to be updated at
540    /// the same time.
541    pub struct RoomEventCacheState {
542        /// The room this state relates to.
543        room: OwnedRoomId,
544
545        /// Reference to the underlying backing store.
546        ///
547        /// Set to none if the room shouldn't read the linked chunk from
548        /// storage, and shouldn't store updates to storage.
549        store: Arc<OnceCell<EventCacheStoreLock>>,
550
551        /// The events of the room.
552        events: RoomEvents,
553
554        /// The events deduplicator instance to help finding duplicates.
555        deduplicator: Deduplicator,
556
557        /// Have we ever waited for a previous-batch-token to come from sync, in
558        /// the context of pagination? We do this at most once per room,
559        /// the first time we try to run backward pagination. We reset
560        /// that upon clearing the timeline events.
561        pub waited_for_initial_prev_token: bool,
562    }
563
564    impl RoomEventCacheState {
565        /// Create a new state, or reload it from storage if it's been enabled.
566        ///
567        /// Not all events are going to be loaded. Only a portion of them. The
568        /// [`RoomEvents`] relies on a [`LinkedChunk`] to store all events. Only
569        /// the last chunk will be loaded. It means the events are loaded from
570        /// the most recent to the oldest. To load more events, see
571        /// [`Self::load_more_events_backwards`].
572        ///
573        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
574        pub async fn new(
575            room_id: OwnedRoomId,
576            store: Arc<OnceCell<EventCacheStoreLock>>,
577        ) -> Result<Self, EventCacheError> {
578            let (events, deduplicator) = if let Some(store) = store.get() {
579                let store_lock = store.lock().await?;
580
581                let linked_chunk = match store_lock
582                    .load_last_chunk(&room_id)
583                    .await
584                    .map_err(EventCacheError::from)
585                    .and_then(|(last_chunk, chunk_identifier_generator)| {
586                        lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
587                            .map_err(EventCacheError::from)
588                    }) {
589                    Ok(linked_chunk) => linked_chunk,
590
591                    Err(err) => {
592                        error!("error when reloading a linked chunk from memory: {err}");
593
594                        // Clear storage for this room.
595                        store_lock
596                            .handle_linked_chunk_updates(&room_id, vec![Update::Clear])
597                            .await?;
598
599                        // Restart with an empty linked chunk.
600                        None
601                    }
602                };
603
604                (
605                    RoomEvents::with_initial_linked_chunk(linked_chunk),
606                    Deduplicator::new_store_based(room_id.clone(), store.clone()),
607                )
608            } else {
609                (RoomEvents::default(), Deduplicator::new_memory_based())
610            };
611
612            Ok(Self {
613                room: room_id,
614                store,
615                events,
616                deduplicator,
617                waited_for_initial_prev_token: false,
618            })
619        }
620
621        /// Deduplicate `events` considering all events in `Self::events`.
622        ///
623        /// The returned tuple contains:
624        /// - all events (duplicated or not) with an ID
625        /// - all the duplicated event IDs
626        /// - a boolean indicating all events (at least one) are duplicates.
627        ///
628        /// This last boolean is useful to know whether we need to store a
629        /// previous-batch token (gap) we received from a server-side
630        /// request (sync or back-pagination), or if we should
631        /// *not* store it.
632        ///
633        /// Since there can be empty back-paginations with a previous-batch
634        /// token (that is, they don't contain any events), we need to
635        /// make sure that there is *at least* one new event that has
636        /// been added. Otherwise, we might conclude something wrong
637        /// because a subsequent back-pagination might
638        /// return non-duplicated events.
639        ///
640        /// If we had already seen all the duplicated events that we're trying
641        /// to add, then it would be wasteful to store a previous-batch
642        /// token, or even touch the linked chunk: we would repeat
643        /// back-paginations for events that we have already seen, and
644        /// possibly misplace them. And we should not be missing
645        /// events either: the already-known events would have their own
646        /// previous-batch token (it might already be consumed).
647        pub async fn collect_valid_and_duplicated_events(
648            &mut self,
649            events: Vec<Event>,
650        ) -> Result<(Vec<Event>, Vec<OwnedEventId>, bool), EventCacheError> {
651            let (events, duplicated_event_ids) =
652                self.deduplicator.filter_duplicate_events(events, &self.events).await?;
653
654            let all_duplicates = !events.is_empty() && events.len() == duplicated_event_ids.len();
655
656            Ok((events, duplicated_event_ids, all_duplicates))
657        }
658
659        /// Load more events backwards if the last chunk is **not** a gap.
660        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
661        pub(in super::super) async fn load_more_events_backwards(
662            &mut self,
663        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
664            let Some(store) = self.store.get() else {
665                // No store: no events to insert. Pretend the caller has to act as if a gap was
666                // present.
667                return Ok(LoadMoreEventsBackwardsOutcome::Gap);
668            };
669
670            // If any in-memory chunk is a gap, don't load more events, and let the caller
671            // resolve the gap.
672            if self.events.chunks().any(|chunk| chunk.is_gap()) {
673                return Ok(LoadMoreEventsBackwardsOutcome::Gap);
674            }
675
676            // Because `first_chunk` is `not `Send`, get this information before the
677            // `.await` point, so that this `Future` can implement `Send`.
678            let first_chunk_identifier =
679                self.events.chunks().next().expect("a linked chunk is never empty").identifier();
680
681            let room_id = &self.room;
682            let store = store.lock().await?;
683
684            // The first chunk is not a gap, we can load its previous chunk.
685            let new_first_chunk =
686                match store.load_previous_chunk(room_id, first_chunk_identifier).await {
687                    Ok(Some(new_first_chunk)) => {
688                        // All good, let's continue with this chunk.
689                        new_first_chunk
690                    }
691                    Ok(None) => {
692                        // No previous chunk: no events to insert. Better, it means we've reached
693                        // the start of the timeline!
694                        return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
695                    }
696                    Err(err) => {
697                        error!("error when loading the previous chunk of a linked chunk: {err}");
698
699                        // Clear storage for this room.
700                        store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?;
701
702                        // Return the error.
703                        return Err(err.into());
704                    }
705                };
706
707            let events = match &new_first_chunk.content {
708                ChunkContent::Gap(_) => None,
709                ChunkContent::Items(events) => Some(events.clone()),
710            };
711
712            if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
713                error!("error when inserting the previous chunk into its linked chunk: {err}");
714
715                // Clear storage for this room.
716                store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?;
717
718                // Return the error.
719                return Err(err.into());
720            };
721
722            // ⚠️ Let's not propagate the updates to the store! We already have these data
723            // in the store! Let's drain them.
724            let _ = self.events.updates().take();
725
726            // However, we want to get updates as `VectorDiff`s.
727            let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
728
729            Ok(match events {
730                None => LoadMoreEventsBackwardsOutcome::Gap,
731                Some(events) => {
732                    LoadMoreEventsBackwardsOutcome::Events(events, updates_as_vector_diffs)
733                }
734            })
735        }
736        /// Removes the bundled relations from an event, if they were present.
737        ///
738        /// Only replaces the present if it contained bundled relations.
739        fn strip_relations_if_present<T>(event: &mut Raw<T>) {
740            // We're going to get rid of the `unsigned`/`m.relations` field, if it's
741            // present.
742            // Use a closure that returns an option so we can quickly short-circuit.
743            let mut closure = || -> Option<()> {
744                let mut val: serde_json::Value = event.deserialize_as().ok()?;
745                let unsigned = val.get_mut("unsigned")?;
746                let unsigned_obj = unsigned.as_object_mut()?;
747                if unsigned_obj.remove("m.relations").is_some() {
748                    *event = Raw::new(&val).ok()?.cast();
749                }
750                None
751            };
752            let _ = closure();
753        }
754
755        fn strip_relations_from_event(ev: &mut TimelineEvent) {
756            match &mut ev.kind {
757                TimelineEventKind::Decrypted(decrypted) => {
758                    // Remove all information about encryption info for
759                    // the bundled events.
760                    decrypted.unsigned_encryption_info = None;
761
762                    // Remove the `unsigned`/`m.relations` field, if needs be.
763                    Self::strip_relations_if_present(&mut decrypted.event);
764                }
765
766                TimelineEventKind::UnableToDecrypt { event, .. }
767                | TimelineEventKind::PlainText { event } => {
768                    Self::strip_relations_if_present(event);
769                }
770            }
771        }
772
773        /// Strips the bundled relations from a collection of events.
774        fn strip_relations_from_events(items: &mut [TimelineEvent]) {
775            for ev in items.iter_mut() {
776                Self::strip_relations_from_event(ev);
777            }
778        }
779
780        /// Propagate changes to the underlying storage.
781        #[instrument(skip_all)]
782        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
783            let mut updates = self.events.updates().take();
784
785            if updates.is_empty() {
786                return Ok(());
787            }
788
789            let Some(store) = self.store.get() else {
790                return Ok(());
791            };
792
793            trace!("propagating {} updates", updates.len());
794
795            // Strip relations from updates which insert or replace items.
796            for update in updates.iter_mut() {
797                match update {
798                    Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
799                    Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
800                    // Other update kinds don't involve adding new events.
801                    Update::NewItemsChunk { .. }
802                    | Update::NewGapChunk { .. }
803                    | Update::RemoveChunk(_)
804                    | Update::RemoveItem { .. }
805                    | Update::DetachLastItems { .. }
806                    | Update::StartReattachItems
807                    | Update::EndReattachItems
808                    | Update::Clear => {}
809                }
810            }
811
812            // Spawn a task to make sure that all the changes are effectively forwarded to
813            // the store, even if the call to this method gets aborted.
814            //
815            // The store cross-process locking involves an actual mutex, which ensures that
816            // storing updates happens in the expected order.
817
818            let store = store.clone();
819            let room_id = self.room.clone();
820
821            spawn(async move {
822                let store = store.lock().await?;
823
824                if let Err(err) = store.handle_linked_chunk_updates(&room_id, updates).await {
825                    error!("unable to handle linked chunk updates: {err}");
826                }
827
828                super::Result::Ok(())
829            })
830            .await
831            .expect("joining failed")?;
832
833            trace!("done propagating store changes");
834
835            Ok(())
836        }
837
838        /// Resets this data structure as if it were brand new.
839        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
840        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
841            self.events.reset();
842            self.propagate_changes().await?;
843            self.waited_for_initial_prev_token = false;
844
845            Ok(self.events.updates_as_vector_diffs())
846        }
847
848        /// Returns a read-only reference to the underlying events.
849        pub fn events(&self) -> &RoomEvents {
850            &self.events
851        }
852
853        /// Gives a temporary mutable handle to the underlying in-memory events,
854        /// and will propagate changes to the storage once done.
855        ///
856        /// Returns the output of the given callback, as well as updates to the
857        /// linked chunk, as vector diff, so the caller may propagate
858        /// such updates, if needs be.
859        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
860        pub async fn with_events_mut<O, F: FnOnce(&mut RoomEvents) -> O>(
861            &mut self,
862            func: F,
863        ) -> Result<(O, Vec<VectorDiff<TimelineEvent>>), EventCacheError> {
864            let output = func(&mut self.events);
865            self.propagate_changes().await?;
866            let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
867            Ok((output, updates_as_vector_diffs))
868        }
869    }
870}
871
872pub(super) use private::RoomEventCacheState;
873
874#[cfg(test)]
875mod tests {
876    use std::sync::Arc;
877
878    use assert_matches::assert_matches;
879    use assert_matches2::assert_let;
880    use matrix_sdk_base::{
881        event_cache::{
882            store::{EventCacheStore as _, MemoryStore},
883            Gap,
884        },
885        linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
886        store::StoreConfig,
887        sync::{JoinedRoomUpdate, Timeline},
888    };
889    use matrix_sdk_common::deserialized_responses::TimelineEvent;
890    use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
891    use ruma::{
892        event_id,
893        events::{
894            relation::RelationType, room::message::RoomMessageEventContentWithoutRelation,
895            AnySyncMessageLikeEvent, AnySyncTimelineEvent,
896        },
897        room_id, user_id, RoomId,
898    };
899
900    use crate::test_utils::{client::MockClientBuilder, logged_in_client};
901
902    #[async_test]
903    async fn test_event_with_redaction_relation() {
904        let original_id = event_id!("$original");
905        let related_id = event_id!("$related");
906        let room_id = room_id!("!galette:saucisse.bzh");
907        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
908
909        assert_relations(
910            room_id,
911            f.text_msg("Original event").event_id(original_id).into(),
912            f.redaction(original_id).event_id(related_id).into(),
913            f,
914        )
915        .await;
916    }
917
918    #[async_test]
919    async fn test_event_with_edit_relation() {
920        let original_id = event_id!("$original");
921        let related_id = event_id!("$related");
922        let room_id = room_id!("!galette:saucisse.bzh");
923        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
924
925        assert_relations(
926            room_id,
927            f.text_msg("Original event").event_id(original_id).into(),
928            f.text_msg("* An edited event")
929                .edit(
930                    original_id,
931                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
932                )
933                .event_id(related_id)
934                .into(),
935            f,
936        )
937        .await;
938    }
939
940    #[async_test]
941    async fn test_event_with_reply_relation() {
942        let original_id = event_id!("$original");
943        let related_id = event_id!("$related");
944        let room_id = room_id!("!galette:saucisse.bzh");
945        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
946
947        assert_relations(
948            room_id,
949            f.text_msg("Original event").event_id(original_id).into(),
950            f.text_msg("A reply").reply_to(original_id).event_id(related_id).into(),
951            f,
952        )
953        .await;
954    }
955
956    #[async_test]
957    async fn test_event_with_thread_reply_relation() {
958        let original_id = event_id!("$original");
959        let related_id = event_id!("$related");
960        let room_id = room_id!("!galette:saucisse.bzh");
961        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
962
963        assert_relations(
964            room_id,
965            f.text_msg("Original event").event_id(original_id).into(),
966            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
967            f,
968        )
969        .await;
970    }
971
972    #[async_test]
973    async fn test_event_with_reaction_relation() {
974        let original_id = event_id!("$original");
975        let related_id = event_id!("$related");
976        let room_id = room_id!("!galette:saucisse.bzh");
977        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
978
979        assert_relations(
980            room_id,
981            f.text_msg("Original event").event_id(original_id).into(),
982            f.reaction(original_id, ":D").event_id(related_id).into(),
983            f,
984        )
985        .await;
986    }
987
988    #[async_test]
989    async fn test_event_with_poll_response_relation() {
990        let original_id = event_id!("$original");
991        let related_id = event_id!("$related");
992        let room_id = room_id!("!galette:saucisse.bzh");
993        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
994
995        assert_relations(
996            room_id,
997            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
998                .event_id(original_id)
999                .into(),
1000            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
1001            f,
1002        )
1003        .await;
1004    }
1005
1006    #[async_test]
1007    async fn test_event_with_poll_end_relation() {
1008        let original_id = event_id!("$original");
1009        let related_id = event_id!("$related");
1010        let room_id = room_id!("!galette:saucisse.bzh");
1011        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1012
1013        assert_relations(
1014            room_id,
1015            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1016                .event_id(original_id)
1017                .into(),
1018            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
1019            f,
1020        )
1021        .await;
1022    }
1023
1024    #[async_test]
1025    async fn test_event_with_filtered_relationships() {
1026        let original_id = event_id!("$original");
1027        let related_id = event_id!("$related");
1028        let associated_related_id = event_id!("$recursive_related");
1029        let room_id = room_id!("!galette:saucisse.bzh");
1030        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1031
1032        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1033        let related_event = event_factory
1034            .text_msg("* Edited event")
1035            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1036            .event_id(related_id)
1037            .into();
1038        let associated_related_event =
1039            event_factory.redaction(related_id).event_id(associated_related_id).into();
1040
1041        let client = logged_in_client(None).await;
1042
1043        let event_cache = client.event_cache();
1044        event_cache.subscribe().unwrap();
1045
1046        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1047        let room = client.get_room(room_id).unwrap();
1048
1049        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1050
1051        // Save the original event.
1052        room_event_cache.save_event(original_event).await;
1053
1054        // Save the related event.
1055        room_event_cache.save_event(related_event).await;
1056
1057        // Save the associated related event, which redacts the related event.
1058        room_event_cache.save_event(associated_related_event).await;
1059
1060        let filter = Some(vec![RelationType::Replacement]);
1061        let (event, related_events) =
1062            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1063        // Fetched event is the right one.
1064        let cached_event_id = event.event_id().unwrap();
1065        assert_eq!(cached_event_id, original_id);
1066
1067        // There are both the related id and the associatively related id
1068        assert_eq!(related_events.len(), 2);
1069
1070        let related_event_id = related_events[0].event_id().unwrap();
1071        assert_eq!(related_event_id, related_id);
1072        let related_event_id = related_events[1].event_id().unwrap();
1073        assert_eq!(related_event_id, associated_related_id);
1074
1075        // Now we'll filter threads instead, there should be no related events
1076        let filter = Some(vec![RelationType::Thread]);
1077        let (event, related_events) =
1078            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1079        // Fetched event is the right one.
1080        let cached_event_id = event.event_id().unwrap();
1081        assert_eq!(cached_event_id, original_id);
1082        // No Thread related events found
1083        assert!(related_events.is_empty());
1084    }
1085
1086    #[async_test]
1087    async fn test_event_with_recursive_relation() {
1088        let original_id = event_id!("$original");
1089        let related_id = event_id!("$related");
1090        let associated_related_id = event_id!("$recursive_related");
1091        let room_id = room_id!("!galette:saucisse.bzh");
1092        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1093
1094        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1095        let related_event = event_factory
1096            .text_msg("* Edited event")
1097            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1098            .event_id(related_id)
1099            .into();
1100        let associated_related_event =
1101            event_factory.redaction(related_id).event_id(associated_related_id).into();
1102
1103        let client = logged_in_client(None).await;
1104
1105        let event_cache = client.event_cache();
1106        event_cache.subscribe().unwrap();
1107
1108        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1109        let room = client.get_room(room_id).unwrap();
1110
1111        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1112
1113        // Save the original event.
1114        room_event_cache.save_event(original_event).await;
1115
1116        // Save the related event.
1117        room_event_cache.save_event(related_event).await;
1118
1119        // Save the associated related event, which redacts the related event.
1120        room_event_cache.save_event(associated_related_event).await;
1121
1122        let (event, related_events) =
1123            room_event_cache.event_with_relations(original_id, None).await.unwrap();
1124        // Fetched event is the right one.
1125        let cached_event_id = event.event_id().unwrap();
1126        assert_eq!(cached_event_id, original_id);
1127
1128        // There are both the related id and the associatively related id
1129        assert_eq!(related_events.len(), 2);
1130
1131        let related_event_id = related_events[0].event_id().unwrap();
1132        assert_eq!(related_event_id, related_id);
1133        let related_event_id = related_events[1].event_id().unwrap();
1134        assert_eq!(related_event_id, associated_related_id);
1135    }
1136
1137    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1138    #[async_test]
1139    async fn test_write_to_storage() {
1140        use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1141
1142        let room_id = room_id!("!galette:saucisse.bzh");
1143        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1144
1145        let event_cache_store = Arc::new(MemoryStore::new());
1146
1147        let client = MockClientBuilder::new("http://localhost".to_owned())
1148            .store_config(
1149                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1150            )
1151            .build()
1152            .await;
1153
1154        let event_cache = client.event_cache();
1155
1156        // Don't forget to subscribe and like^W enable storage!
1157        event_cache.subscribe().unwrap();
1158        event_cache.enable_storage().unwrap();
1159
1160        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1161        let room = client.get_room(room_id).unwrap();
1162
1163        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1164
1165        // Propagate an update for a message and a prev-batch token.
1166        let timeline = Timeline {
1167            limited: true,
1168            prev_batch: Some("raclette".to_owned()),
1169            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1170        };
1171
1172        room_event_cache
1173            .inner
1174            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1175            .await
1176            .unwrap();
1177
1178        let linked_chunk =
1179            from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1180                .unwrap()
1181                .unwrap();
1182
1183        assert_eq!(linked_chunk.chunks().count(), 3);
1184
1185        let mut chunks = linked_chunk.chunks();
1186
1187        // Invariant: there's always an empty items chunk at the beginning.
1188        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1189            assert_eq!(events.len(), 0)
1190        });
1191
1192        // Then we have the gap.
1193        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1194            assert_eq!(gap.prev_token, "raclette");
1195        });
1196
1197        // Then we have the stored event.
1198        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1199            assert_eq!(events.len(), 1);
1200            let deserialized = events[0].raw().deserialize().unwrap();
1201            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1202            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1203        });
1204
1205        // That's all, folks!
1206        assert!(chunks.next().is_none());
1207    }
1208
1209    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1210    #[async_test]
1211    async fn test_write_to_storage_strips_bundled_relations() {
1212        use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1213        use ruma::events::BundledMessageLikeRelations;
1214
1215        let room_id = room_id!("!galette:saucisse.bzh");
1216        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1217
1218        let event_cache_store = Arc::new(MemoryStore::new());
1219
1220        let client = MockClientBuilder::new("http://localhost".to_owned())
1221            .store_config(
1222                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1223            )
1224            .build()
1225            .await;
1226
1227        let event_cache = client.event_cache();
1228
1229        // Don't forget to subscribe and like^W enable storage!
1230        event_cache.subscribe().unwrap();
1231        event_cache.enable_storage().unwrap();
1232
1233        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1234        let room = client.get_room(room_id).unwrap();
1235
1236        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1237
1238        // Propagate an update for a message with bundled relations.
1239        let mut relations = BundledMessageLikeRelations::new();
1240        relations.replace =
1241            Some(Box::new(f.text_msg("Hello, Kind Sir").sender(*ALICE).into_raw_sync()));
1242        let ev = f.text_msg("hey yo").sender(*ALICE).bundled_relations(relations).into_event();
1243
1244        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1245
1246        room_event_cache
1247            .inner
1248            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1249            .await
1250            .unwrap();
1251
1252        // The in-memory linked chunk keeps the bundled relation.
1253        {
1254            let (events, _) = room_event_cache.subscribe().await;
1255
1256            assert_eq!(events.len(), 1);
1257
1258            let ev = events[0].raw().deserialize().unwrap();
1259            assert_let!(
1260                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1261            );
1262
1263            let original = msg.as_original().unwrap();
1264            assert_eq!(original.content.body(), "hey yo");
1265            assert!(original.unsigned.relations.replace.is_some());
1266        }
1267
1268        // The one in storage does not.
1269        let linked_chunk =
1270            from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1271                .unwrap()
1272                .unwrap();
1273
1274        assert_eq!(linked_chunk.chunks().count(), 1);
1275
1276        let mut chunks = linked_chunk.chunks();
1277        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1278            assert_eq!(events.len(), 1);
1279
1280            let ev = events[0].raw().deserialize().unwrap();
1281            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1282
1283            let original = msg.as_original().unwrap();
1284            assert_eq!(original.content.body(), "hey yo");
1285            assert!(original.unsigned.relations.replace.is_none());
1286        });
1287
1288        // That's all, folks!
1289        assert!(chunks.next().is_none());
1290    }
1291
1292    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1293    #[async_test]
1294    async fn test_clear() {
1295        use eyeball_im::VectorDiff;
1296        use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1297
1298        use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
1299
1300        let room_id = room_id!("!galette:saucisse.bzh");
1301        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1302
1303        let event_cache_store = Arc::new(MemoryStore::new());
1304
1305        let event_id1 = event_id!("$1");
1306        let event_id2 = event_id!("$2");
1307
1308        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1309        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1310
1311        // Prefill the store with some data.
1312        event_cache_store
1313            .handle_linked_chunk_updates(
1314                room_id,
1315                vec![
1316                    // An empty items chunk.
1317                    Update::NewItemsChunk {
1318                        previous: None,
1319                        new: ChunkIdentifier::new(0),
1320                        next: None,
1321                    },
1322                    // A gap chunk.
1323                    Update::NewGapChunk {
1324                        previous: Some(ChunkIdentifier::new(0)),
1325                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1326                        new: ChunkIdentifier::new(42),
1327                        next: None,
1328                        gap: Gap { prev_token: "comté".to_owned() },
1329                    },
1330                    // Another items chunk, non-empty this time.
1331                    Update::NewItemsChunk {
1332                        previous: Some(ChunkIdentifier::new(42)),
1333                        new: ChunkIdentifier::new(1),
1334                        next: None,
1335                    },
1336                    Update::PushItems {
1337                        at: Position::new(ChunkIdentifier::new(1), 0),
1338                        items: vec![ev1.clone()],
1339                    },
1340                    // And another items chunk, non-empty again.
1341                    Update::NewItemsChunk {
1342                        previous: Some(ChunkIdentifier::new(1)),
1343                        new: ChunkIdentifier::new(2),
1344                        next: None,
1345                    },
1346                    Update::PushItems {
1347                        at: Position::new(ChunkIdentifier::new(2), 0),
1348                        items: vec![ev2.clone()],
1349                    },
1350                ],
1351            )
1352            .await
1353            .unwrap();
1354
1355        let client = MockClientBuilder::new("http://localhost".to_owned())
1356            .store_config(
1357                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1358            )
1359            .build()
1360            .await;
1361
1362        let event_cache = client.event_cache();
1363
1364        // Don't forget to subscribe and like^W enable storage!
1365        event_cache.subscribe().unwrap();
1366        event_cache.enable_storage().unwrap();
1367
1368        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1369        let room = client.get_room(room_id).unwrap();
1370
1371        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1372
1373        let (items, mut stream) = room_event_cache.subscribe().await;
1374
1375        // The rooms knows about some cached events.
1376        {
1377            // The chunk containing this event is not loaded yet
1378            assert!(room_event_cache.event(event_id1).await.is_none());
1379            // The chunk containing this event **is** loaded.
1380            assert!(room_event_cache.event(event_id2).await.is_some());
1381
1382            // The reloaded room must contain only one event.
1383            assert_eq!(items.len(), 1);
1384            assert_eq!(items[0].event_id().unwrap(), event_id2);
1385
1386            assert!(stream.is_empty());
1387        }
1388
1389        // Let's load more chunks to get all events.
1390        {
1391            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1392
1393            assert_let_timeout!(
1394                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1395            );
1396            assert_eq!(diffs.len(), 1);
1397            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: _ });
1398
1399            // The rooms knows about more cached events.
1400            assert!(room_event_cache.event(event_id1).await.is_some());
1401            assert!(room_event_cache.event(event_id2).await.is_some());
1402
1403            assert!(stream.is_empty());
1404        }
1405
1406        // After clearing,…
1407        room_event_cache.clear().await.unwrap();
1408
1409        //… we get an update that the content has been cleared.
1410        assert_let_timeout!(
1411            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1412        );
1413        assert_eq!(diffs.len(), 1);
1414        assert_let!(VectorDiff::Clear = &diffs[0]);
1415
1416        // The room event cache has forgotten about the events.
1417        assert!(room_event_cache.event(event_id1).await.is_none());
1418
1419        let (items, _) = room_event_cache.subscribe().await;
1420        assert!(items.is_empty());
1421
1422        // The event cache store too.
1423        let linked_chunk =
1424            from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1425                .unwrap()
1426                .unwrap();
1427
1428        // Note: while the event cache store could return `None` here, clearing it will
1429        // reset it to its initial form, maintaining the invariant that it
1430        // contains a single items chunk that's empty.
1431        assert_eq!(linked_chunk.num_items(), 0);
1432    }
1433
1434    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1435    #[async_test]
1436    async fn test_load_from_storage() {
1437        use eyeball_im::VectorDiff;
1438
1439        use super::RoomEventCacheUpdate;
1440        use crate::assert_let_timeout;
1441
1442        let room_id = room_id!("!galette:saucisse.bzh");
1443        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1444
1445        let event_cache_store = Arc::new(MemoryStore::new());
1446
1447        let event_id1 = event_id!("$1");
1448        let event_id2 = event_id!("$2");
1449
1450        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1451        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1452
1453        // Prefill the store with some data.
1454        event_cache_store
1455            .handle_linked_chunk_updates(
1456                room_id,
1457                vec![
1458                    // An empty items chunk.
1459                    Update::NewItemsChunk {
1460                        previous: None,
1461                        new: ChunkIdentifier::new(0),
1462                        next: None,
1463                    },
1464                    // A gap chunk.
1465                    Update::NewGapChunk {
1466                        previous: Some(ChunkIdentifier::new(0)),
1467                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1468                        new: ChunkIdentifier::new(42),
1469                        next: None,
1470                        gap: Gap { prev_token: "cheddar".to_owned() },
1471                    },
1472                    // Another items chunk, non-empty this time.
1473                    Update::NewItemsChunk {
1474                        previous: Some(ChunkIdentifier::new(42)),
1475                        new: ChunkIdentifier::new(1),
1476                        next: None,
1477                    },
1478                    Update::PushItems {
1479                        at: Position::new(ChunkIdentifier::new(1), 0),
1480                        items: vec![ev1.clone()],
1481                    },
1482                    // And another items chunk, non-empty again.
1483                    Update::NewItemsChunk {
1484                        previous: Some(ChunkIdentifier::new(1)),
1485                        new: ChunkIdentifier::new(2),
1486                        next: None,
1487                    },
1488                    Update::PushItems {
1489                        at: Position::new(ChunkIdentifier::new(2), 0),
1490                        items: vec![ev2.clone()],
1491                    },
1492                ],
1493            )
1494            .await
1495            .unwrap();
1496
1497        let client = MockClientBuilder::new("http://localhost".to_owned())
1498            .store_config(
1499                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1500            )
1501            .build()
1502            .await;
1503
1504        let event_cache = client.event_cache();
1505
1506        // Don't forget to subscribe and like^W enable storage!
1507        event_cache.subscribe().unwrap();
1508        event_cache.enable_storage().unwrap();
1509
1510        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1511        let room = client.get_room(room_id).unwrap();
1512
1513        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1514
1515        let (items, mut stream) = room_event_cache.subscribe().await;
1516
1517        // The initial items contain one event because only the last chunk is loaded by
1518        // default.
1519        assert_eq!(items.len(), 1);
1520        assert_eq!(items[0].event_id().unwrap(), event_id2);
1521        assert!(stream.is_empty());
1522
1523        // The event cache knows only one event.
1524        assert!(room_event_cache.event(event_id1).await.is_none());
1525        assert!(room_event_cache.event(event_id2).await.is_some());
1526
1527        // Let's paginate to load more events.
1528        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1529
1530        assert_let_timeout!(
1531            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1532        );
1533        assert_eq!(diffs.len(), 1);
1534        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: _ });
1535
1536        // The event cache knows about the two events now!
1537        assert!(room_event_cache.event(event_id1).await.is_some());
1538        assert!(room_event_cache.event(event_id2).await.is_some());
1539
1540        assert!(stream.is_empty());
1541
1542        // A new update with one of these events leads to deduplication.
1543        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1544        room_event_cache
1545            .inner
1546            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1547            .await
1548            .unwrap();
1549
1550        // The stream doesn't report these changes *yet*. Use the items vector given
1551        // when subscribing, to check that the items correspond to their new
1552        // positions. The duplicated item is removed (so it's not the first
1553        // element anymore), and it's added to the back of the list.
1554        let (items, _stream) = room_event_cache.subscribe().await;
1555        assert_eq!(items.len(), 2);
1556        assert_eq!(items[0].event_id().unwrap(), event_id1);
1557        assert_eq!(items[1].event_id().unwrap(), event_id2);
1558    }
1559
1560    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1561    #[async_test]
1562    async fn test_load_from_storage_resilient_to_failure() {
1563        let room_id = room_id!("!fondue:patate.ch");
1564        let event_cache_store = Arc::new(MemoryStore::new());
1565
1566        let event = EventFactory::new()
1567            .room(room_id)
1568            .sender(user_id!("@ben:saucisse.bzh"))
1569            .text_msg("foo")
1570            .event_id(event_id!("$42"))
1571            .into_event();
1572
1573        // Prefill the store with invalid data: two chunks that form a cycle.
1574        event_cache_store
1575            .handle_linked_chunk_updates(
1576                room_id,
1577                vec![
1578                    Update::NewItemsChunk {
1579                        previous: None,
1580                        new: ChunkIdentifier::new(0),
1581                        next: None,
1582                    },
1583                    Update::PushItems {
1584                        at: Position::new(ChunkIdentifier::new(0), 0),
1585                        items: vec![event],
1586                    },
1587                    Update::NewItemsChunk {
1588                        previous: Some(ChunkIdentifier::new(0)),
1589                        new: ChunkIdentifier::new(1),
1590                        next: Some(ChunkIdentifier::new(0)),
1591                    },
1592                ],
1593            )
1594            .await
1595            .unwrap();
1596
1597        let client = MockClientBuilder::new("http://localhost".to_owned())
1598            .store_config(
1599                StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
1600            )
1601            .build()
1602            .await;
1603
1604        let event_cache = client.event_cache();
1605
1606        // Don't forget to subscribe and like^W enable storage!
1607        event_cache.subscribe().unwrap();
1608        event_cache.enable_storage().unwrap();
1609
1610        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1611        let room = client.get_room(room_id).unwrap();
1612
1613        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1614
1615        let (items, _stream) = room_event_cache.subscribe().await;
1616
1617        // Because the persisted content was invalid, the room store is reset: there are
1618        // no events in the cache.
1619        assert!(items.is_empty());
1620
1621        // Storage doesn't contain anything. It would also be valid that it contains a
1622        // single initial empty items chunk.
1623        let raw_chunks = event_cache_store.load_all_chunks(room_id).await.unwrap();
1624        assert!(raw_chunks.is_empty());
1625    }
1626
1627    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1628    #[async_test]
1629    async fn test_no_useless_gaps() {
1630        let room_id = room_id!("!galette:saucisse.bzh");
1631
1632        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
1633
1634        let event_cache = client.event_cache();
1635        event_cache.subscribe().unwrap();
1636
1637        let has_storage = true; // for testing purposes only
1638        event_cache.enable_storage().unwrap();
1639
1640        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1641        let room = client.get_room(room_id).unwrap();
1642        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1643
1644        let f = EventFactory::new().room(room_id).sender(*ALICE);
1645
1646        // Propagate an update including a limited timeline with one message and a
1647        // prev-batch token.
1648        room_event_cache
1649            .inner
1650            .handle_joined_room_update(
1651                has_storage,
1652                JoinedRoomUpdate {
1653                    timeline: Timeline {
1654                        limited: true,
1655                        prev_batch: Some("raclette".to_owned()),
1656                        events: vec![f.text_msg("hey yo").into_event()],
1657                    },
1658                    ..Default::default()
1659                },
1660            )
1661            .await
1662            .unwrap();
1663
1664        {
1665            let state = room_event_cache.inner.state.read().await;
1666
1667            let mut num_gaps = 0;
1668            let mut num_events = 0;
1669
1670            for c in state.events().chunks() {
1671                match c.content() {
1672                    ChunkContent::Items(items) => num_events += items.len(),
1673                    ChunkContent::Gap(_) => num_gaps += 1,
1674                }
1675            }
1676
1677            // The gap must have been stored.
1678            assert_eq!(num_gaps, 1);
1679            assert_eq!(num_events, 1);
1680        }
1681
1682        // Now, propagate an update for another message, but the timeline isn't limited
1683        // this time.
1684        room_event_cache
1685            .inner
1686            .handle_joined_room_update(
1687                has_storage,
1688                JoinedRoomUpdate {
1689                    timeline: Timeline {
1690                        limited: false,
1691                        prev_batch: Some("fondue".to_owned()),
1692                        events: vec![f.text_msg("sup").into_event()],
1693                    },
1694                    ..Default::default()
1695                },
1696            )
1697            .await
1698            .unwrap();
1699
1700        {
1701            let state = room_event_cache.inner.state.read().await;
1702
1703            let mut num_gaps = 0;
1704            let mut num_events = 0;
1705
1706            for c in state.events().chunks() {
1707                match c.content() {
1708                    ChunkContent::Items(items) => num_events += items.len(),
1709                    ChunkContent::Gap(gap) => {
1710                        assert_eq!(gap.prev_token, "raclette");
1711                        num_gaps += 1;
1712                    }
1713                }
1714            }
1715
1716            // There's only the previous gap, no new ones.
1717            assert_eq!(num_gaps, 1);
1718            assert_eq!(num_events, 2);
1719        }
1720    }
1721
1722    async fn assert_relations(
1723        room_id: &RoomId,
1724        original_event: TimelineEvent,
1725        related_event: TimelineEvent,
1726        event_factory: EventFactory,
1727    ) {
1728        let client = logged_in_client(None).await;
1729
1730        let event_cache = client.event_cache();
1731        event_cache.subscribe().unwrap();
1732
1733        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1734        let room = client.get_room(room_id).unwrap();
1735
1736        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1737
1738        // Save the original event.
1739        let original_event_id = original_event.event_id().unwrap();
1740        room_event_cache.save_event(original_event).await;
1741
1742        // Save an unrelated event to check it's not in the related events list.
1743        let unrelated_id = event_id!("$2");
1744        room_event_cache
1745            .save_event(event_factory.text_msg("An unrelated event").event_id(unrelated_id).into())
1746            .await;
1747
1748        // Save the related event.
1749        let related_id = related_event.event_id().unwrap();
1750        room_event_cache.save_event(related_event).await;
1751
1752        let (event, related_events) =
1753            room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
1754        // Fetched event is the right one.
1755        let cached_event_id = event.event_id().unwrap();
1756        assert_eq!(cached_event_id, original_event_id);
1757
1758        // There is only the actually related event in the related ones
1759        assert_eq!(related_events.len(), 1);
1760        let related_event_id = related_events[0].event_id().unwrap();
1761        assert_eq!(related_event_id, related_id);
1762    }
1763}