matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        atomic::{AtomicUsize, Ordering},
23        Arc,
24    },
25};
26
27use events::{sort_positions_descending, Gap};
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::{AmbiguityChange, TimelineEvent},
32    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
33};
34use ruma::{
35    events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
36    serde::Raw,
37    EventId, OwnedEventId, OwnedRoomId,
38};
39use tokio::sync::{
40    broadcast::{Receiver, Sender},
41    mpsc, Notify, RwLock,
42};
43use tracing::{error, instrument, trace, warn};
44
45use super::{
46    deduplicator::DeduplicationOutcome, AllEventsCache, AutoShrinkChannelPayload, EventsOrigin,
47    Result, RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
48};
49use crate::{client::WeakClient, room::WeakRoom};
50
51pub(super) mod events;
52
53/// A subset of an event cache, for a room.
54///
55/// Cloning is shallow, and thus is cheap to do.
56#[derive(Clone)]
57pub struct RoomEventCache {
58    pub(super) inner: Arc<RoomEventCacheInner>,
59}
60
61impl fmt::Debug for RoomEventCache {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        f.debug_struct("RoomEventCache").finish_non_exhaustive()
64    }
65}
66
67/// Thin wrapper for a room event cache listener, so as to trigger side-effects
68/// when all listeners are gone.
69#[allow(missing_debug_implementations)]
70pub struct RoomEventCacheListener {
71    /// Underlying receiver of the room event cache's updates.
72    recv: Receiver<RoomEventCacheUpdate>,
73
74    /// To which room are we listening?
75    room_id: OwnedRoomId,
76
77    /// Sender to the auto-shrink channel.
78    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
79
80    /// Shared instance of the auto-shrinker.
81    listener_count: Arc<AtomicUsize>,
82}
83
84impl Drop for RoomEventCacheListener {
85    fn drop(&mut self) {
86        let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst);
87
88        trace!("dropping a room event cache listener; previous count: {previous_listener_count}");
89
90        if previous_listener_count == 1 {
91            // We were the last instance of the listener; let the auto-shrinker know by
92            // notifying it of our room id.
93
94            let mut room_id = self.room_id.clone();
95
96            // Try to send without waiting for channel capacity, and restart in a spin-loop
97            // if it failed (until a maximum number of attempts is reached, or
98            // the send was successful). The channel shouldn't be super busy in
99            // general, so this should resolve quickly enough.
100
101            let mut num_attempts = 0;
102
103            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
104                num_attempts += 1;
105
106                if num_attempts > 1024 {
107                    // If we've tried too many times, just give up with a warning; after all, this
108                    // is only an optimization.
109                    warn!("couldn't send notification to the auto-shrink channel after 1024 attempts; giving up");
110                    return;
111                }
112
113                match err {
114                    mpsc::error::TrySendError::Full(stolen_room_id) => {
115                        room_id = stolen_room_id;
116                    }
117                    mpsc::error::TrySendError::Closed(_) => return,
118                }
119            }
120
121            trace!("sent notification to the parent channel that we were the last listener");
122        }
123    }
124}
125
126impl Deref for RoomEventCacheListener {
127    type Target = Receiver<RoomEventCacheUpdate>;
128
129    fn deref(&self) -> &Self::Target {
130        &self.recv
131    }
132}
133
134impl DerefMut for RoomEventCacheListener {
135    fn deref_mut(&mut self) -> &mut Self::Target {
136        &mut self.recv
137    }
138}
139
140impl RoomEventCache {
141    /// Create a new [`RoomEventCache`] using the given room and store.
142    pub(super) fn new(
143        client: WeakClient,
144        state: RoomEventCacheState,
145        pagination_status: SharedObservable<RoomPaginationStatus>,
146        room_id: OwnedRoomId,
147        all_events_cache: Arc<RwLock<AllEventsCache>>,
148        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
149    ) -> Self {
150        Self {
151            inner: Arc::new(RoomEventCacheInner::new(
152                client,
153                state,
154                pagination_status,
155                room_id,
156                all_events_cache,
157                auto_shrink_sender,
158            )),
159        }
160    }
161
162    /// Subscribe to this room updates, after getting the initial list of
163    /// events.
164    pub async fn subscribe(&self) -> (Vec<TimelineEvent>, RoomEventCacheListener) {
165        let state = self.inner.state.read().await;
166        let events = state.events().events().map(|(_position, item)| item.clone()).collect();
167
168        let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst);
169        trace!("added a room event cache listener; new count: {}", previous_listener_count + 1);
170
171        let recv = self.inner.sender.subscribe();
172        let listener = RoomEventCacheListener {
173            recv,
174            room_id: self.inner.room_id.clone(),
175            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
176            listener_count: state.listener_count.clone(),
177        };
178
179        (events, listener)
180    }
181
182    /// Return a [`RoomPagination`] API object useful for running
183    /// back-pagination queries in the current room.
184    pub fn pagination(&self) -> RoomPagination {
185        RoomPagination { inner: self.inner.clone() }
186    }
187
188    /// Try to find an event by id in this room.
189    pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
190        // Search in all loaded or stored events.
191        let Ok(maybe_position_and_event) = self.inner.state.read().await.find_event(event_id).await
192        else {
193            error!("Failed to find the event");
194
195            return None;
196        };
197
198        // Search in `AllEventsCache` for known events that are not stored.
199        if let Some(event) = maybe_position_and_event.map(|(_location, _position, event)| event) {
200            Some(event)
201        } else if let Some((room_id, event)) =
202            self.inner.all_events.read().await.events.get(event_id).cloned()
203        {
204            (room_id == self.inner.room_id).then_some(event)
205        } else {
206            None
207        }
208    }
209
210    /// Try to find an event by id in this room, along with its related events.
211    ///
212    /// You can filter which types of related events to retrieve using
213    /// `filter`. `None` will retrieve related events of any type.
214    pub async fn event_with_relations(
215        &self,
216        event_id: &EventId,
217        filter: Option<Vec<RelationType>>,
218    ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
219        let cache = self.inner.all_events.read().await;
220        if let Some((_, event)) = cache.events.get(event_id) {
221            let related_events = cache.collect_related_events(event_id, filter.as_deref());
222            Some((event.clone(), related_events))
223        } else {
224            None
225        }
226    }
227
228    /// Clear all the storage for this [`RoomEventCache`].
229    ///
230    /// This will get rid of all the events from the linked chunk and persisted
231    /// storage.
232    pub async fn clear(&self) -> Result<()> {
233        // Clear the linked chunk and persisted storage.
234        let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
235
236        // Clear the (temporary) events mappings.
237        self.inner.all_events.write().await.clear();
238
239        // Notify observers about the update.
240        let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
241            diffs: updates_as_vector_diffs,
242            origin: EventsOrigin::Cache,
243        });
244
245        Ok(())
246    }
247
248    /// Save a single event in the event cache, for further retrieval with
249    /// [`Self::event`].
250    // TODO: This doesn't insert the event into the linked chunk. In the future
251    // there'll be no distinction between the linked chunk and the separate
252    // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886.
253    pub(crate) async fn save_event(&self, event: TimelineEvent) {
254        if let Some(event_id) = event.event_id() {
255            let mut cache = self.inner.all_events.write().await;
256
257            cache.append_related_event(&event);
258            cache.events.insert(event_id, (self.inner.room_id.clone(), event));
259        } else {
260            warn!("couldn't save event without event id in the event cache");
261        }
262    }
263
264    /// Save some events in the event cache, for further retrieval with
265    /// [`Self::event`]. This function will save them using a single lock,
266    /// as opposed to [`Self::save_event`].
267    // TODO: This doesn't insert the event into the linked chunk. In the future
268    // there'll be no distinction between the linked chunk and the separate
269    // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886.
270    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
271        let mut cache = self.inner.all_events.write().await;
272        for event in events {
273            if let Some(event_id) = event.event_id() {
274                cache.append_related_event(&event);
275                cache.events.insert(event_id, (self.inner.room_id.clone(), event));
276            } else {
277                warn!("couldn't save event without event id in the event cache");
278            }
279        }
280    }
281
282    /// Return a nice debug string (a vector of lines) for the linked chunk of
283    /// events for this room.
284    pub async fn debug_string(&self) -> Vec<String> {
285        self.inner.state.read().await.events().debug_string()
286    }
287}
288
289/// The (non-cloneable) details of the `RoomEventCache`.
290pub(super) struct RoomEventCacheInner {
291    /// The room id for this room.
292    room_id: OwnedRoomId,
293
294    pub weak_room: WeakRoom,
295
296    /// Sender part for subscribers to this room.
297    pub sender: Sender<RoomEventCacheUpdate>,
298
299    /// State for this room's event cache.
300    pub state: RwLock<RoomEventCacheState>,
301
302    /// See comment of [`super::EventCacheInner::all_events`].
303    ///
304    /// This is shared between the [`super::EventCacheInner`] singleton and all
305    /// [`RoomEventCacheInner`] instances.
306    all_events: Arc<RwLock<AllEventsCache>>,
307
308    /// A notifier that we received a new pagination token.
309    pub pagination_batch_token_notifier: Notify,
310
311    pub pagination_status: SharedObservable<RoomPaginationStatus>,
312
313    /// Sender to the auto-shrink channel.
314    ///
315    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
316    /// more details.
317    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
318}
319
320impl RoomEventCacheInner {
321    /// Creates a new cache for a room, and subscribes to room updates, so as
322    /// to handle new timeline events.
323    fn new(
324        client: WeakClient,
325        state: RoomEventCacheState,
326        pagination_status: SharedObservable<RoomPaginationStatus>,
327        room_id: OwnedRoomId,
328        all_events_cache: Arc<RwLock<AllEventsCache>>,
329        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
330    ) -> Self {
331        let sender = Sender::new(32);
332        let weak_room = WeakRoom::new(client, room_id);
333        Self {
334            room_id: weak_room.room_id().to_owned(),
335            weak_room,
336            state: RwLock::new(state),
337            all_events: all_events_cache,
338            sender,
339            pagination_batch_token_notifier: Default::default(),
340            auto_shrink_sender,
341            pagination_status,
342        }
343    }
344
345    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
346        if account_data.is_empty() {
347            return;
348        }
349
350        let mut handled_read_marker = false;
351
352        trace!("Handling account data");
353
354        for raw_event in account_data {
355            match raw_event.deserialize() {
356                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
357                    // If duplicated, do not forward read marker multiple times
358                    // to avoid clutter the update channel.
359                    if handled_read_marker {
360                        continue;
361                    }
362
363                    handled_read_marker = true;
364
365                    // Propagate to observers. (We ignore the error if there aren't any.)
366                    let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
367                        event_id: ev.content.event_id,
368                    });
369                }
370
371                Ok(_) => {
372                    // We're not interested in other room account data updates,
373                    // at this point.
374                }
375
376                Err(e) => {
377                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
378                    warn!(event_type, "Failed to deserialize account data: {e}");
379                }
380            }
381        }
382    }
383
384    #[instrument(skip_all, fields(room_id = %self.room_id))]
385    pub(super) async fn handle_joined_room_update(
386        &self,
387        has_storage: bool,
388        updates: JoinedRoomUpdate,
389    ) -> Result<()> {
390        self.handle_timeline(
391            has_storage,
392            updates.timeline,
393            updates.ephemeral.clone(),
394            updates.ambiguity_changes,
395        )
396        .await?;
397
398        self.handle_account_data(updates.account_data);
399
400        Ok(())
401    }
402
403    async fn handle_timeline(
404        &self,
405        has_storage: bool,
406        timeline: Timeline,
407        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
408        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
409    ) -> Result<()> {
410        if !has_storage && timeline.limited {
411            // Ideally we'd try to reconcile existing events against those received in the
412            // timeline, but we're not there yet. In the meanwhile, clear the
413            // items from the room. TODO: implement Smart Matching™.
414            trace!("limited timeline, clearing all previous events and pushing new events");
415
416            self.replace_all_events_by(
417                timeline.events,
418                timeline.prev_batch,
419                ephemeral_events,
420                ambiguity_changes,
421                EventsOrigin::Sync,
422            )
423            .await?;
424        } else {
425            // Add all the events to the backend.
426            trace!("adding new events");
427
428            // If we have storage, only keep the previous-batch token if we have a limited
429            // timeline. Otherwise, we know about all the events, and we don't need to
430            // back-paginate, so we wouldn't make use of the given previous-batch token.
431            //
432            // If we don't have storage, even if the timeline isn't limited, we may not have
433            // saved the previous events in any cache, so we should always be
434            // able to retrieve those.
435            let prev_batch =
436                if has_storage && !timeline.limited { None } else { timeline.prev_batch };
437
438            let mut state = self.state.write().await;
439            self.append_events_locked(
440                &mut state,
441                timeline.events,
442                prev_batch,
443                ephemeral_events,
444                ambiguity_changes,
445            )
446            .await?;
447        }
448
449        Ok(())
450    }
451
452    #[instrument(skip_all, fields(room_id = %self.room_id))]
453    pub(super) async fn handle_left_room_update(
454        &self,
455        has_storage: bool,
456        updates: LeftRoomUpdate,
457    ) -> Result<()> {
458        self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes)
459            .await?;
460        Ok(())
461    }
462
463    /// Remove existing events, and append a set of events to the room cache and
464    /// storage, notifying observers.
465    pub(super) async fn replace_all_events_by(
466        &self,
467        timeline_events: Vec<TimelineEvent>,
468        prev_batch: Option<String>,
469        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
470        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
471        events_origin: EventsOrigin,
472    ) -> Result<()> {
473        // Acquire the lock.
474        let mut state = self.state.write().await;
475
476        // Reset the room's state.
477        let updates_as_vector_diffs = state.reset().await?;
478
479        // Propagate to observers.
480        let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
481            diffs: updates_as_vector_diffs,
482            origin: events_origin,
483        });
484
485        // Push the new events.
486        self.append_events_locked(
487            &mut state,
488            timeline_events,
489            prev_batch.clone(),
490            ephemeral_events,
491            ambiguity_changes,
492        )
493        .await?;
494
495        Ok(())
496    }
497
498    /// Append a set of events to the room cache and storage, notifying
499    /// observers.
500    ///
501    /// This is a private implementation. It must not be exposed publicly.
502    async fn append_events_locked(
503        &self,
504        state: &mut RoomEventCacheState,
505        timeline_events: Vec<TimelineEvent>,
506        prev_batch: Option<String>,
507        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
508        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
509    ) -> Result<()> {
510        if timeline_events.is_empty()
511            && prev_batch.is_none()
512            && ephemeral_events.is_empty()
513            && ambiguity_changes.is_empty()
514        {
515            return Ok(());
516        }
517
518        let (
519            DeduplicationOutcome {
520                all_events: events,
521                in_memory_duplicated_event_ids,
522                in_store_duplicated_event_ids,
523            },
524            all_duplicates,
525        ) = state.collect_valid_and_duplicated_events(timeline_events).await?;
526
527        // During a sync, when a duplicated event is found, the old event is removed and
528        // the new event is added.
529        //
530        // Let's remove the old events that are duplicated.
531        let timeline_event_diffs = if all_duplicates {
532            // No new events, thus no need to change the room events.
533            vec![]
534        } else {
535            // Remove the old duplicated events.
536            //
537            // We don't have to worry the removals can change the position of the
538            // existing events, because we are pushing all _new_
539            // `events` at the back.
540            let mut timeline_event_diffs = state
541                .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
542                .await?;
543
544            // Add the previous back-pagination token (if present), followed by the timeline
545            // events themselves.
546            let new_timeline_event_diffs = state
547                .with_events_mut(|room_events| {
548                    // If we only received duplicated events, we don't need to store the gap: if
549                    // there was a gap, we'd have received an unknown event at the tail of
550                    // the room's timeline (unless the server reordered sync events since the last
551                    // time we sync'd).
552                    if !all_duplicates {
553                        if let Some(prev_token) = &prev_batch {
554                            // As a tiny optimization: remove the last chunk if it's an empty event
555                            // one, as it's not useful to keep it before a gap.
556                            let prev_chunk_to_remove =
557                                room_events.rchunks().next().and_then(|chunk| {
558                                    (chunk.is_items() && chunk.num_items() == 0)
559                                        .then_some(chunk.identifier())
560                                });
561
562                            room_events.push_gap(Gap { prev_token: prev_token.clone() });
563
564                            if let Some(prev_chunk_to_remove) = prev_chunk_to_remove {
565                                room_events.remove_empty_chunk_at(prev_chunk_to_remove).expect(
566                                    "we just checked the chunk is there, and it's an empty item chunk",
567                                );
568                            }
569                        }
570                    }
571
572                    room_events.push_events(events.clone());
573
574                    events.clone()
575                })
576                .await?;
577
578            timeline_event_diffs.extend(new_timeline_event_diffs);
579
580            if prev_batch.is_some() && !all_duplicates {
581                // If there was a previous batch token, and there's at least one non-duplicated
582                // new event, unload the chunks so it only contains the last
583                // one; otherwise, there might be a valid gap in between, and
584                // observers may not render it (yet).
585                //
586                // We must do this *after* the above call to `.with_events_mut`, so the new
587                // events and gaps are properly persisted to storage.
588                if let Some(diffs) = state.shrink_to_last_chunk().await? {
589                    // Override the diffs with the new ones, as per `shrink_to_last_chunk`'s API
590                    // contract.
591                    timeline_event_diffs = diffs;
592                }
593            }
594
595            {
596                // Fill the AllEventsCache.
597                let mut all_events_cache = self.all_events.write().await;
598
599                for event in events {
600                    if let Some(event_id) = event.event_id() {
601                        all_events_cache.append_related_event(&event);
602                        all_events_cache
603                            .events
604                            .insert(event_id.to_owned(), (self.room_id.clone(), event));
605                    }
606                }
607            }
608
609            timeline_event_diffs
610        };
611
612        // Now that all events have been added, we can trigger the
613        // `pagination_token_notifier`.
614        if prev_batch.is_some() {
615            self.pagination_batch_token_notifier.notify_one();
616        }
617
618        // The order of `RoomEventCacheUpdate`s is **really** important here.
619        {
620            if !timeline_event_diffs.is_empty() {
621                let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
622                    diffs: timeline_event_diffs,
623                    origin: EventsOrigin::Sync,
624                });
625            }
626
627            if !ephemeral_events.is_empty() {
628                let _ = self
629                    .sender
630                    .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
631            }
632
633            if !ambiguity_changes.is_empty() {
634                let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
635            }
636        }
637
638        Ok(())
639    }
640}
641
642/// Internal type to represent the output of
643/// [`RoomEventCacheState::load_more_events_backwards`].
644#[derive(Debug)]
645pub(super) enum LoadMoreEventsBackwardsOutcome {
646    /// A gap has been inserted.
647    Gap {
648        /// The previous batch token to be used as the "end" parameter in the
649        /// back-pagination request.
650        prev_token: Option<String>,
651    },
652
653    /// The start of the timeline has been reached.
654    StartOfTimeline,
655
656    /// Events have been inserted.
657    Events {
658        events: Vec<TimelineEvent>,
659        timeline_event_diffs: Vec<VectorDiff<TimelineEvent>>,
660        reached_start: bool,
661    },
662
663    /// The caller must wait for the initial previous-batch token, and retry.
664    WaitForInitialPrevToken,
665}
666
667// Use a private module to hide `events` to this parent module.
668mod private {
669    use std::sync::{atomic::AtomicUsize, Arc};
670
671    use eyeball::SharedObservable;
672    use eyeball_im::VectorDiff;
673    use matrix_sdk_base::{
674        apply_redaction,
675        deserialized_responses::{TimelineEvent, TimelineEventKind},
676        event_cache::{store::EventCacheStoreLock, Event, Gap},
677        linked_chunk::{lazy_loader, ChunkContent, ChunkIdentifierGenerator, Position, Update},
678    };
679    use matrix_sdk_common::executor::spawn;
680    use once_cell::sync::OnceCell;
681    use ruma::{
682        events::{
683            room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent, MessageLikeEventType,
684        },
685        serde::Raw,
686        EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
687    };
688    use tracing::{debug, error, instrument, trace, warn};
689
690    use super::{
691        super::{
692            deduplicator::{DeduplicationOutcome, Deduplicator},
693            EventCacheError,
694        },
695        events::RoomEvents,
696        sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
697    };
698    use crate::event_cache::RoomPaginationStatus;
699
700    /// State for a single room's event cache.
701    ///
702    /// This contains all the inner mutable states that ought to be updated at
703    /// the same time.
704    pub struct RoomEventCacheState {
705        /// The room this state relates to.
706        room: OwnedRoomId,
707
708        /// The room version for this room.
709        room_version: RoomVersionId,
710
711        /// Reference to the underlying backing store.
712        ///
713        /// Set to none if the room shouldn't read the linked chunk from
714        /// storage, and shouldn't store updates to storage.
715        store: Arc<OnceCell<EventCacheStoreLock>>,
716
717        /// The events of the room.
718        events: RoomEvents,
719
720        /// The events deduplicator instance to help finding duplicates.
721        deduplicator: Deduplicator,
722
723        /// Have we ever waited for a previous-batch-token to come from sync, in
724        /// the context of pagination? We do this at most once per room,
725        /// the first time we try to run backward pagination. We reset
726        /// that upon clearing the timeline events.
727        pub waited_for_initial_prev_token: bool,
728
729        pagination_status: SharedObservable<RoomPaginationStatus>,
730
731        /// An atomic count of the current number of listeners of the
732        /// [`super::RoomEventCache`].
733        pub(super) listener_count: Arc<AtomicUsize>,
734    }
735
736    impl RoomEventCacheState {
737        /// Create a new state, or reload it from storage if it's been enabled.
738        ///
739        /// Not all events are going to be loaded. Only a portion of them. The
740        /// [`RoomEvents`] relies on a [`LinkedChunk`] to store all events. Only
741        /// the last chunk will be loaded. It means the events are loaded from
742        /// the most recent to the oldest. To load more events, see
743        /// [`Self::load_more_events_backwards`].
744        ///
745        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
746        pub async fn new(
747            room_id: OwnedRoomId,
748            room_version: RoomVersionId,
749            store: Arc<OnceCell<EventCacheStoreLock>>,
750            pagination_status: SharedObservable<RoomPaginationStatus>,
751        ) -> Result<Self, EventCacheError> {
752            let (events, deduplicator) = if let Some(store) = store.get() {
753                let store_lock = store.lock().await?;
754
755                let linked_chunk = match store_lock
756                    .load_last_chunk(&room_id)
757                    .await
758                    .map_err(EventCacheError::from)
759                    .and_then(|(last_chunk, chunk_identifier_generator)| {
760                        lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
761                            .map_err(EventCacheError::from)
762                    }) {
763                    Ok(linked_chunk) => linked_chunk,
764
765                    Err(err) => {
766                        error!("error when reloading a linked chunk from memory: {err}");
767
768                        // Clear storage for this room.
769                        store_lock
770                            .handle_linked_chunk_updates(&room_id, vec![Update::Clear])
771                            .await?;
772
773                        // Restart with an empty linked chunk.
774                        None
775                    }
776                };
777
778                (
779                    RoomEvents::with_initial_linked_chunk(linked_chunk),
780                    Deduplicator::new_store_based(room_id.clone(), store.clone()),
781                )
782            } else {
783                (RoomEvents::default(), Deduplicator::new_memory_based())
784            };
785
786            Ok(Self {
787                room: room_id,
788                room_version,
789                store,
790                events,
791                deduplicator,
792                waited_for_initial_prev_token: false,
793                listener_count: Default::default(),
794                pagination_status,
795            })
796        }
797
798        /// Deduplicate `events` considering all events in `Self::events`.
799        ///
800        /// The returned tuple contains:
801        /// - all events (duplicated or not) with an ID
802        /// - all the duplicated event IDs with their position,
803        /// - a boolean indicating all events (at least one) are duplicates.
804        ///
805        /// This last boolean is useful to know whether we need to store a
806        /// previous-batch token (gap) we received from a server-side
807        /// request (sync or back-pagination), or if we should
808        /// *not* store it.
809        ///
810        /// Since there can be empty back-paginations with a previous-batch
811        /// token (that is, they don't contain any events), we need to
812        /// make sure that there is *at least* one new event that has
813        /// been added. Otherwise, we might conclude something wrong
814        /// because a subsequent back-pagination might
815        /// return non-duplicated events.
816        ///
817        /// If we had already seen all the duplicated events that we're trying
818        /// to add, then it would be wasteful to store a previous-batch
819        /// token, or even touch the linked chunk: we would repeat
820        /// back-paginations for events that we have already seen, and
821        /// possibly misplace them. And we should not be missing
822        /// events either: the already-known events would have their own
823        /// previous-batch token (it might already be consumed).
824        pub async fn collect_valid_and_duplicated_events(
825            &mut self,
826            events: Vec<Event>,
827        ) -> Result<(DeduplicationOutcome, bool), EventCacheError> {
828            let deduplication_outcome =
829                self.deduplicator.filter_duplicate_events(events, &self.events).await?;
830
831            let number_of_events = deduplication_outcome.all_events.len();
832            let number_of_deduplicated_events =
833                deduplication_outcome.in_memory_duplicated_event_ids.len()
834                    + deduplication_outcome.in_store_duplicated_event_ids.len();
835
836            let all_duplicates =
837                number_of_events > 0 && number_of_events == number_of_deduplicated_events;
838
839            Ok((deduplication_outcome, all_duplicates))
840        }
841
842        /// Given a fully-loaded linked chunk with no gaps, return the
843        /// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache.
844        fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
845            // If we never received events for this room, this means we've never
846            // received a sync for that room, because every room must have at least a
847            // room creation event. Otherwise, we have reached the start of the
848            // timeline.
849            if self.events.events().next().is_some() {
850                // If there's at least one event, this means we've reached the start of the
851                // timeline, since the chunk is fully loaded.
852                trace!("chunk is fully loaded and non-empty: reached_start=true");
853                LoadMoreEventsBackwardsOutcome::StartOfTimeline
854            } else if !self.waited_for_initial_prev_token {
855                // There's no events. Since we haven't yet, wait for an initial previous-token.
856                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
857            } else {
858                // Otherwise, we've already waited, *and* received no previous-batch token from
859                // the sync, *and* there are still no events in the fully-loaded
860                // chunk: start back-pagination from the end of the room.
861                LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
862            }
863        }
864
865        /// Load more events backwards if the last chunk is **not** a gap.
866        pub(in super::super) async fn load_more_events_backwards(
867            &mut self,
868        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
869            let Some(store) = self.store.get() else {
870                // No store to reload events from. Pretend the caller has to act as if a gap was
871                // present. Limited syncs will always clear and push a gap, in this mode.
872                // There's no lazy-loading.
873
874                // Look for a gap in the in-memory chunk, iterating in reverse so as to get the
875                // most recent one.
876                if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
877                    return Ok(LoadMoreEventsBackwardsOutcome::Gap {
878                        prev_token: Some(prev_token),
879                    });
880                }
881
882                return Ok(self.conclude_load_more_for_fully_loaded_chunk());
883            };
884
885            // If any in-memory chunk is a gap, don't load more events, and let the caller
886            // resolve the gap.
887            if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
888                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
889            }
890
891            // Because `first_chunk` is `not `Send`, get this information before the
892            // `.await` point, so that this `Future` can implement `Send`.
893            let first_chunk_identifier =
894                self.events.chunks().next().expect("a linked chunk is never empty").identifier();
895
896            let store = store.lock().await?;
897
898            // The first chunk is not a gap, we can load its previous chunk.
899            let new_first_chunk =
900                match store.load_previous_chunk(&self.room, first_chunk_identifier).await {
901                    Ok(Some(new_first_chunk)) => {
902                        // All good, let's continue with this chunk.
903                        new_first_chunk
904                    }
905
906                    Ok(None) => {
907                        // There's no previous chunk. The chunk is now fully-loaded. Conclude.
908                        return Ok(self.conclude_load_more_for_fully_loaded_chunk());
909                    }
910
911                    Err(err) => {
912                        error!("error when loading the previous chunk of a linked chunk: {err}");
913
914                        // Clear storage for this room.
915                        store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
916
917                        // Return the error.
918                        return Err(err.into());
919                    }
920                };
921
922            let chunk_content = new_first_chunk.content.clone();
923
924            // We've reached the start on disk, if and only if, there was no chunk prior to
925            // the one we just loaded.
926            //
927            // This value is correct, if and only if, it is used for a chunk content of kind
928            // `Items`.
929            let reached_start = new_first_chunk.previous.is_none();
930
931            if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
932                error!("error when inserting the previous chunk into its linked chunk: {err}");
933
934                // Clear storage for this room.
935                store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
936
937                // Return the error.
938                return Err(err.into());
939            };
940
941            // ⚠️ Let's not propagate the updates to the store! We already have these data
942            // in the store! Let's drain them.
943            let _ = self.events.store_updates().take();
944
945            // However, we want to get updates as `VectorDiff`s.
946            let timeline_event_diffs = self.events.updates_as_vector_diffs();
947
948            Ok(match chunk_content {
949                ChunkContent::Gap(gap) => {
950                    trace!("reloaded chunk from disk (gap)");
951                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
952                }
953
954                ChunkContent::Items(events) => {
955                    trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
956                    LoadMoreEventsBackwardsOutcome::Events {
957                        events,
958                        timeline_event_diffs,
959                        reached_start,
960                    }
961                }
962            })
963        }
964
965        /// If storage is enabled, unload all the chunks, then reloads only the
966        /// last one.
967        ///
968        /// If storage's enabled, return a diff update that starts with a clear
969        /// of all events; as a result, the caller may override any
970        /// pending diff updates with the result of this function.
971        ///
972        /// Otherwise, returns `None`.
973        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
974        pub(super) async fn shrink_to_last_chunk(
975            &mut self,
976        ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
977            let Some(store) = self.store.get() else {
978                // No need to do anything if there's no storage; we'll already reset the
979                // timeline after a limited response.
980                return Ok(None);
981            };
982
983            let store_lock = store.lock().await?;
984
985            // Attempt to load the last chunk.
986            let (last_chunk, chunk_identifier_generator) = match store_lock
987                .load_last_chunk(&self.room)
988                .await
989            {
990                Ok(pair) => pair,
991
992                Err(err) => {
993                    // If loading the last chunk failed, clear the entire linked chunk.
994                    error!("error when reloading a linked chunk from memory: {err}");
995
996                    // Clear storage for this room.
997                    store_lock.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
998
999                    // Restart with an empty linked chunk.
1000                    (None, ChunkIdentifierGenerator::new_from_scratch())
1001                }
1002            };
1003
1004            debug!("unloading the linked chunk, and resetting it to its last chunk");
1005
1006            // Remove all the chunks from the linked chunks, except for the last one, and
1007            // updates the chunk identifier generator.
1008            if let Err(err) = self.events.replace_with(last_chunk, chunk_identifier_generator) {
1009                error!("error when replacing the linked chunk: {err}");
1010                return self.reset().await.map(Some);
1011            }
1012
1013            // Let pagination observers know that we may have not reached the start of the
1014            // timeline.
1015            // TODO: likely need to cancel any ongoing pagination.
1016            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1017
1018            // Don't propagate those updates to the store; this is only for the in-memory
1019            // representation that we're doing this. Let's drain those store updates.
1020            let _ = self.events.store_updates().take();
1021
1022            // However, we want to get updates as `VectorDiff`s, for the external listeners.
1023            // Check we're respecting the contract defined in the doc comment.
1024            let diffs = self.events.updates_as_vector_diffs();
1025            assert!(matches!(diffs[0], VectorDiff::Clear));
1026
1027            Ok(Some(diffs))
1028        }
1029
1030        /// Automatically shrink the room if there are no listeners, as
1031        /// indicated by the atomic number of active listeners.
1032        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1033        pub(crate) async fn auto_shrink_if_no_listeners(
1034            &mut self,
1035        ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
1036            let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst);
1037
1038            trace!(listener_count, "received request to auto-shrink");
1039
1040            if listener_count == 0 {
1041                // If we are the last strong reference to the auto-shrinker, we can shrink the
1042                // events data structure to its last chunk.
1043                self.shrink_to_last_chunk().await
1044            } else {
1045                Ok(None)
1046            }
1047        }
1048
1049        /// Removes the bundled relations from an event, if they were present.
1050        ///
1051        /// Only replaces the present if it contained bundled relations.
1052        fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1053            // We're going to get rid of the `unsigned`/`m.relations` field, if it's
1054            // present.
1055            // Use a closure that returns an option so we can quickly short-circuit.
1056            let mut closure = || -> Option<()> {
1057                let mut val: serde_json::Value = event.deserialize_as().ok()?;
1058                let unsigned = val.get_mut("unsigned")?;
1059                let unsigned_obj = unsigned.as_object_mut()?;
1060                if unsigned_obj.remove("m.relations").is_some() {
1061                    *event = Raw::new(&val).ok()?.cast();
1062                }
1063                None
1064            };
1065            let _ = closure();
1066        }
1067
1068        fn strip_relations_from_event(ev: &mut TimelineEvent) {
1069            match &mut ev.kind {
1070                TimelineEventKind::Decrypted(decrypted) => {
1071                    // Remove all information about encryption info for
1072                    // the bundled events.
1073                    decrypted.unsigned_encryption_info = None;
1074
1075                    // Remove the `unsigned`/`m.relations` field, if needs be.
1076                    Self::strip_relations_if_present(&mut decrypted.event);
1077                }
1078
1079                TimelineEventKind::UnableToDecrypt { event, .. }
1080                | TimelineEventKind::PlainText { event } => {
1081                    Self::strip_relations_if_present(event);
1082                }
1083            }
1084        }
1085
1086        /// Strips the bundled relations from a collection of events.
1087        fn strip_relations_from_events(items: &mut [TimelineEvent]) {
1088            for ev in items.iter_mut() {
1089                Self::strip_relations_from_event(ev);
1090            }
1091        }
1092
1093        /// Remove events by their position, in `RoomEvents` and in
1094        /// `EventCacheStore`.
1095        ///
1096        /// This method is purposely isolated because it must ensure that
1097        /// positions are sorted appropriately or it can be disastrous.
1098        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1099        #[instrument(skip_all)]
1100        pub(crate) async fn remove_events(
1101            &mut self,
1102            in_memory_events: Vec<(OwnedEventId, Position)>,
1103            in_store_events: Vec<(OwnedEventId, Position)>,
1104        ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1105            // In-store events.
1106            if !in_store_events.is_empty() {
1107                let mut positions = in_store_events
1108                    .into_iter()
1109                    .map(|(_event_id, position)| position)
1110                    .collect::<Vec<_>>();
1111
1112                sort_positions_descending(&mut positions);
1113
1114                self.send_updates_to_store(
1115                    positions
1116                        .into_iter()
1117                        .map(|position| Update::RemoveItem { at: position })
1118                        .collect(),
1119                )
1120                .await?;
1121            }
1122
1123            // In-memory events.
1124            let timeline_event_diffs = if !in_memory_events.is_empty() {
1125                self.with_events_mut(|room_events| {
1126                    // `remove_events_by_position` sorts the positions by itself.
1127                    room_events
1128                        .remove_events_by_position(
1129                            in_memory_events
1130                                .into_iter()
1131                                .map(|(_event_id, position)| position)
1132                                .collect(),
1133                        )
1134                        .expect("failed to remove an event");
1135
1136                    vec![]
1137                })
1138                .await?
1139            } else {
1140                Vec::new()
1141            };
1142
1143            Ok(timeline_event_diffs)
1144        }
1145
1146        /// Propagate changes to the underlying storage.
1147        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1148            let updates = self.events.store_updates().take();
1149            self.send_updates_to_store(updates).await
1150        }
1151
1152        pub async fn send_updates_to_store(
1153            &mut self,
1154            mut updates: Vec<Update<TimelineEvent, Gap>>,
1155        ) -> Result<(), EventCacheError> {
1156            let Some(store) = self.store.get() else {
1157                return Ok(());
1158            };
1159
1160            if updates.is_empty() {
1161                return Ok(());
1162            }
1163
1164            // Strip relations from updates which insert or replace items.
1165            for update in updates.iter_mut() {
1166                match update {
1167                    Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1168                    Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1169                    // Other update kinds don't involve adding new events.
1170                    Update::NewItemsChunk { .. }
1171                    | Update::NewGapChunk { .. }
1172                    | Update::RemoveChunk(_)
1173                    | Update::RemoveItem { .. }
1174                    | Update::DetachLastItems { .. }
1175                    | Update::StartReattachItems
1176                    | Update::EndReattachItems
1177                    | Update::Clear => {}
1178                }
1179            }
1180
1181            // Spawn a task to make sure that all the changes are effectively forwarded to
1182            // the store, even if the call to this method gets aborted.
1183            //
1184            // The store cross-process locking involves an actual mutex, which ensures that
1185            // storing updates happens in the expected order.
1186
1187            let store = store.clone();
1188            let room_id = self.room.clone();
1189
1190            spawn(async move {
1191                let store = store.lock().await?;
1192
1193                trace!(?updates, "sending linked chunk updates to the store");
1194                store.handle_linked_chunk_updates(&room_id, updates).await?;
1195                trace!("linked chunk updates applied");
1196
1197                super::Result::Ok(())
1198            })
1199            .await
1200            .expect("joining failed")?;
1201
1202            Ok(())
1203        }
1204
1205        /// Reset this data structure as if it were brand new.
1206        ///
1207        /// Return a single diff update that is a clear of all events; as a
1208        /// result, the caller may override any pending diff updates
1209        /// with the result of this function.
1210        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1211        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1212            self.events.reset();
1213
1214            self.propagate_changes().await?;
1215
1216            // Reset the pagination state too: pretend we never waited for the initial
1217            // prev-batch token, and indicate that we're not at the start of the
1218            // timeline, since we don't know about that anymore.
1219            self.waited_for_initial_prev_token = false;
1220            // TODO: likely must cancel any ongoing back-paginations too
1221            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1222
1223            let diff_updates = self.events.updates_as_vector_diffs();
1224
1225            // Ensure the contract defined in the doc comment is true:
1226            debug_assert_eq!(diff_updates.len(), 1);
1227            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1228
1229            Ok(diff_updates)
1230        }
1231
1232        /// Returns a read-only reference to the underlying events.
1233        pub fn events(&self) -> &RoomEvents {
1234            &self.events
1235        }
1236
1237        /// Find a single event in this room.
1238        ///
1239        /// It starts by looking into loaded events in `RoomEvents` before
1240        /// looking inside the storage if it is enabled.
1241        pub async fn find_event(
1242            &self,
1243            event_id: &EventId,
1244        ) -> Result<Option<(EventLocation, Position, TimelineEvent)>, EventCacheError> {
1245            let room_id = self.room.as_ref();
1246
1247            // There are supposedly fewer events loaded in memory than in the store. Let's
1248            // start by looking up in the `RoomEvents`.
1249            for (position, event) in self.events().revents() {
1250                if event.event_id().as_deref() == Some(event_id) {
1251                    return Ok(Some((EventLocation::Memory, position, event.clone())));
1252                }
1253            }
1254
1255            let Some(store) = self.store.get() else {
1256                // No store, event is not present.
1257                return Ok(None);
1258            };
1259
1260            let store = store.lock().await?;
1261
1262            Ok(store
1263                .find_event(room_id, event_id)
1264                .await?
1265                .map(|(position, event)| (EventLocation::Store, position, event)))
1266        }
1267
1268        /// Gives a temporary mutable handle to the underlying in-memory events,
1269        /// and will propagate changes to the storage once done.
1270        ///
1271        /// Returns the updates to the linked chunk, as vector diffs, so the
1272        /// caller may propagate such updates, if needs be.
1273        ///
1274        /// The function `func` takes a mutable reference to `RoomEvents`. It
1275        /// returns a set of events that will be post-processed. At the time of
1276        /// writing, all these events are passed to
1277        /// `Self::maybe_apply_new_redaction`.
1278        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1279        #[instrument(skip_all, fields(room_id = %self.room))]
1280        pub async fn with_events_mut<F>(
1281            &mut self,
1282            func: F,
1283        ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError>
1284        where
1285            F: FnOnce(&mut RoomEvents) -> Vec<TimelineEvent>,
1286        {
1287            let events_to_post_process = func(&mut self.events);
1288
1289            // Update the store before doing the post-processing.
1290            self.propagate_changes().await?;
1291
1292            for event in &events_to_post_process {
1293                self.maybe_apply_new_redaction(event).await?;
1294            }
1295
1296            // If we've never waited for an initial previous-batch token, and we now have at
1297            // least one gap in the chunk, no need to wait for a previous-batch token later.
1298            if !self.waited_for_initial_prev_token
1299                && self.events.chunks().any(|chunk| chunk.is_gap())
1300            {
1301                self.waited_for_initial_prev_token = true;
1302            }
1303
1304            let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
1305
1306            Ok(updates_as_vector_diffs)
1307        }
1308
1309        /// If the given event is a redaction, try to retrieve the
1310        /// to-be-redacted event in the chunk, and replace it by the
1311        /// redacted form.
1312        #[instrument(skip_all)]
1313        async fn maybe_apply_new_redaction(
1314            &mut self,
1315            event: &Event,
1316        ) -> Result<(), EventCacheError> {
1317            let raw_event = event.raw();
1318
1319            // Do not deserialise the entire event if we aren't certain it's a
1320            // `m.room.redaction`. It saves a non-negligible amount of computations.
1321            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1322                raw_event.get_field::<MessageLikeEventType>("type")
1323            else {
1324                return Ok(());
1325            };
1326
1327            // It is a `m.room.redaction`! We can deserialize it entirely.
1328
1329            let Ok(AnySyncTimelineEvent::MessageLike(
1330                ruma::events::AnySyncMessageLikeEvent::RoomRedaction(redaction),
1331            )) = event.raw().deserialize()
1332            else {
1333                return Ok(());
1334            };
1335
1336            let Some(event_id) = redaction.redacts(&self.room_version) else {
1337                warn!("missing target event id from the redaction event");
1338                return Ok(());
1339            };
1340
1341            // Replace the redacted event by a redacted form, if we knew about it.
1342            if let Some((location, position, target_event)) = self.find_event(event_id).await? {
1343                // Don't redact already redacted events.
1344                if let Ok(deserialized) = target_event.raw().deserialize() {
1345                    match deserialized {
1346                        AnySyncTimelineEvent::MessageLike(ev) => {
1347                            if ev.is_redacted() {
1348                                return Ok(());
1349                            }
1350                        }
1351                        AnySyncTimelineEvent::State(ev) => {
1352                            if ev.is_redacted() {
1353                                return Ok(());
1354                            }
1355                        }
1356                    }
1357                }
1358
1359                if let Some(redacted_event) = apply_redaction(
1360                    target_event.raw(),
1361                    event.raw().cast_ref::<SyncRoomRedactionEvent>(),
1362                    &self.room_version,
1363                ) {
1364                    let mut copy = target_event.clone();
1365
1366                    // It's safe to cast `redacted_event` here:
1367                    // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
1368                    //   when calling .raw(), so it's still one under the hood.
1369                    // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
1370                    copy.replace_raw(redacted_event.cast());
1371
1372                    match location {
1373                        EventLocation::Memory => {
1374                            self.events
1375                                .replace_event_at(position, copy)
1376                                .expect("should have been a valid position of an item");
1377                        }
1378                        EventLocation::Store => {
1379                            self.send_updates_to_store(vec![Update::ReplaceItem {
1380                                at: position,
1381                                item: copy,
1382                            }])
1383                            .await?;
1384                        }
1385                    }
1386                }
1387            } else {
1388                trace!("redacted event is missing from the linked chunk");
1389            }
1390
1391            // TODO: remove all related events too!
1392
1393            Ok(())
1394        }
1395    }
1396}
1397
1398/// An enum representing where an event has been found.
1399pub(super) enum EventLocation {
1400    /// Event lives in memory (and likely in the store!).
1401    Memory,
1402
1403    /// Event lives in the store only, it has not been loaded in memory yet.
1404    Store,
1405}
1406
1407pub(super) use private::RoomEventCacheState;
1408
1409#[cfg(test)]
1410mod tests {
1411    use std::sync::Arc;
1412
1413    use assert_matches::assert_matches;
1414    use assert_matches2::assert_let;
1415    use matrix_sdk_base::{
1416        event_cache::{
1417            store::{EventCacheStore as _, MemoryStore},
1418            Gap,
1419        },
1420        linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1421        store::StoreConfig,
1422        sync::{JoinedRoomUpdate, Timeline},
1423    };
1424    use matrix_sdk_common::deserialized_responses::TimelineEvent;
1425    use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
1426    use ruma::{
1427        event_id,
1428        events::{
1429            relation::RelationType, room::message::RoomMessageEventContentWithoutRelation,
1430            AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1431        },
1432        room_id, user_id, RoomId,
1433    };
1434
1435    use crate::test_utils::{client::MockClientBuilder, logged_in_client};
1436
1437    #[async_test]
1438    async fn test_event_with_redaction_relation() {
1439        let original_id = event_id!("$original");
1440        let related_id = event_id!("$related");
1441        let room_id = room_id!("!galette:saucisse.bzh");
1442        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1443
1444        assert_relations(
1445            room_id,
1446            f.text_msg("Original event").event_id(original_id).into(),
1447            f.redaction(original_id).event_id(related_id).into(),
1448            f,
1449        )
1450        .await;
1451    }
1452
1453    #[async_test]
1454    async fn test_event_with_edit_relation() {
1455        let original_id = event_id!("$original");
1456        let related_id = event_id!("$related");
1457        let room_id = room_id!("!galette:saucisse.bzh");
1458        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1459
1460        assert_relations(
1461            room_id,
1462            f.text_msg("Original event").event_id(original_id).into(),
1463            f.text_msg("* An edited event")
1464                .edit(
1465                    original_id,
1466                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1467                )
1468                .event_id(related_id)
1469                .into(),
1470            f,
1471        )
1472        .await;
1473    }
1474
1475    #[async_test]
1476    async fn test_event_with_reply_relation() {
1477        let original_id = event_id!("$original");
1478        let related_id = event_id!("$related");
1479        let room_id = room_id!("!galette:saucisse.bzh");
1480        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1481
1482        assert_relations(
1483            room_id,
1484            f.text_msg("Original event").event_id(original_id).into(),
1485            f.text_msg("A reply").reply_to(original_id).event_id(related_id).into(),
1486            f,
1487        )
1488        .await;
1489    }
1490
1491    #[async_test]
1492    async fn test_event_with_thread_reply_relation() {
1493        let original_id = event_id!("$original");
1494        let related_id = event_id!("$related");
1495        let room_id = room_id!("!galette:saucisse.bzh");
1496        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1497
1498        assert_relations(
1499            room_id,
1500            f.text_msg("Original event").event_id(original_id).into(),
1501            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
1502            f,
1503        )
1504        .await;
1505    }
1506
1507    #[async_test]
1508    async fn test_event_with_reaction_relation() {
1509        let original_id = event_id!("$original");
1510        let related_id = event_id!("$related");
1511        let room_id = room_id!("!galette:saucisse.bzh");
1512        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1513
1514        assert_relations(
1515            room_id,
1516            f.text_msg("Original event").event_id(original_id).into(),
1517            f.reaction(original_id, ":D").event_id(related_id).into(),
1518            f,
1519        )
1520        .await;
1521    }
1522
1523    #[async_test]
1524    async fn test_event_with_poll_response_relation() {
1525        let original_id = event_id!("$original");
1526        let related_id = event_id!("$related");
1527        let room_id = room_id!("!galette:saucisse.bzh");
1528        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1529
1530        assert_relations(
1531            room_id,
1532            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1533                .event_id(original_id)
1534                .into(),
1535            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
1536            f,
1537        )
1538        .await;
1539    }
1540
1541    #[async_test]
1542    async fn test_event_with_poll_end_relation() {
1543        let original_id = event_id!("$original");
1544        let related_id = event_id!("$related");
1545        let room_id = room_id!("!galette:saucisse.bzh");
1546        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1547
1548        assert_relations(
1549            room_id,
1550            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1551                .event_id(original_id)
1552                .into(),
1553            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
1554            f,
1555        )
1556        .await;
1557    }
1558
1559    #[async_test]
1560    async fn test_event_with_filtered_relationships() {
1561        let original_id = event_id!("$original");
1562        let related_id = event_id!("$related");
1563        let associated_related_id = event_id!("$recursive_related");
1564        let room_id = room_id!("!galette:saucisse.bzh");
1565        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1566
1567        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1568        let related_event = event_factory
1569            .text_msg("* Edited event")
1570            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1571            .event_id(related_id)
1572            .into();
1573        let associated_related_event =
1574            event_factory.redaction(related_id).event_id(associated_related_id).into();
1575
1576        let client = logged_in_client(None).await;
1577
1578        let event_cache = client.event_cache();
1579        event_cache.subscribe().unwrap();
1580
1581        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1582        let room = client.get_room(room_id).unwrap();
1583
1584        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1585
1586        // Save the original event.
1587        room_event_cache.save_event(original_event).await;
1588
1589        // Save the related event.
1590        room_event_cache.save_event(related_event).await;
1591
1592        // Save the associated related event, which redacts the related event.
1593        room_event_cache.save_event(associated_related_event).await;
1594
1595        let filter = Some(vec![RelationType::Replacement]);
1596        let (event, related_events) =
1597            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1598        // Fetched event is the right one.
1599        let cached_event_id = event.event_id().unwrap();
1600        assert_eq!(cached_event_id, original_id);
1601
1602        // There are both the related id and the associatively related id
1603        assert_eq!(related_events.len(), 2);
1604
1605        let related_event_id = related_events[0].event_id().unwrap();
1606        assert_eq!(related_event_id, related_id);
1607        let related_event_id = related_events[1].event_id().unwrap();
1608        assert_eq!(related_event_id, associated_related_id);
1609
1610        // Now we'll filter threads instead, there should be no related events
1611        let filter = Some(vec![RelationType::Thread]);
1612        let (event, related_events) =
1613            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1614        // Fetched event is the right one.
1615        let cached_event_id = event.event_id().unwrap();
1616        assert_eq!(cached_event_id, original_id);
1617        // No Thread related events found
1618        assert!(related_events.is_empty());
1619    }
1620
1621    #[async_test]
1622    async fn test_event_with_recursive_relation() {
1623        let original_id = event_id!("$original");
1624        let related_id = event_id!("$related");
1625        let associated_related_id = event_id!("$recursive_related");
1626        let room_id = room_id!("!galette:saucisse.bzh");
1627        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1628
1629        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1630        let related_event = event_factory
1631            .text_msg("* Edited event")
1632            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1633            .event_id(related_id)
1634            .into();
1635        let associated_related_event =
1636            event_factory.redaction(related_id).event_id(associated_related_id).into();
1637
1638        let client = logged_in_client(None).await;
1639
1640        let event_cache = client.event_cache();
1641        event_cache.subscribe().unwrap();
1642
1643        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1644        let room = client.get_room(room_id).unwrap();
1645
1646        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1647
1648        // Save the original event.
1649        room_event_cache.save_event(original_event).await;
1650
1651        // Save the related event.
1652        room_event_cache.save_event(related_event).await;
1653
1654        // Save the associated related event, which redacts the related event.
1655        room_event_cache.save_event(associated_related_event).await;
1656
1657        let (event, related_events) =
1658            room_event_cache.event_with_relations(original_id, None).await.unwrap();
1659        // Fetched event is the right one.
1660        let cached_event_id = event.event_id().unwrap();
1661        assert_eq!(cached_event_id, original_id);
1662
1663        // There are both the related id and the associatively related id
1664        assert_eq!(related_events.len(), 2);
1665
1666        let related_event_id = related_events[0].event_id().unwrap();
1667        assert_eq!(related_event_id, related_id);
1668        let related_event_id = related_events[1].event_id().unwrap();
1669        assert_eq!(related_event_id, associated_related_id);
1670    }
1671
1672    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1673    #[async_test]
1674    async fn test_write_to_storage() {
1675        use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1676
1677        let room_id = room_id!("!galette:saucisse.bzh");
1678        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1679
1680        let event_cache_store = Arc::new(MemoryStore::new());
1681
1682        let client = MockClientBuilder::new("http://localhost".to_owned())
1683            .store_config(
1684                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1685            )
1686            .build()
1687            .await;
1688
1689        let event_cache = client.event_cache();
1690
1691        // Don't forget to subscribe and like^W enable storage!
1692        event_cache.subscribe().unwrap();
1693        event_cache.enable_storage().unwrap();
1694
1695        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1696        let room = client.get_room(room_id).unwrap();
1697
1698        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1699
1700        // Propagate an update for a message and a prev-batch token.
1701        let timeline = Timeline {
1702            limited: true,
1703            prev_batch: Some("raclette".to_owned()),
1704            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1705        };
1706
1707        room_event_cache
1708            .inner
1709            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1710            .await
1711            .unwrap();
1712
1713        let linked_chunk =
1714            from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1715                .unwrap()
1716                .unwrap();
1717
1718        assert_eq!(linked_chunk.chunks().count(), 2);
1719
1720        let mut chunks = linked_chunk.chunks();
1721
1722        // We start with the gap.
1723        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1724            assert_eq!(gap.prev_token, "raclette");
1725        });
1726
1727        // Then we have the stored event.
1728        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1729            assert_eq!(events.len(), 1);
1730            let deserialized = events[0].raw().deserialize().unwrap();
1731            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1732            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1733        });
1734
1735        // That's all, folks!
1736        assert!(chunks.next().is_none());
1737    }
1738
1739    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1740    #[async_test]
1741    async fn test_write_to_storage_strips_bundled_relations() {
1742        use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1743        use ruma::events::BundledMessageLikeRelations;
1744
1745        let room_id = room_id!("!galette:saucisse.bzh");
1746        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1747
1748        let event_cache_store = Arc::new(MemoryStore::new());
1749
1750        let client = MockClientBuilder::new("http://localhost".to_owned())
1751            .store_config(
1752                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1753            )
1754            .build()
1755            .await;
1756
1757        let event_cache = client.event_cache();
1758
1759        // Don't forget to subscribe and like^W enable storage!
1760        event_cache.subscribe().unwrap();
1761        event_cache.enable_storage().unwrap();
1762
1763        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1764        let room = client.get_room(room_id).unwrap();
1765
1766        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1767
1768        // Propagate an update for a message with bundled relations.
1769        let mut relations = BundledMessageLikeRelations::new();
1770        relations.replace =
1771            Some(Box::new(f.text_msg("Hello, Kind Sir").sender(*ALICE).into_raw_sync()));
1772        let ev = f.text_msg("hey yo").sender(*ALICE).bundled_relations(relations).into_event();
1773
1774        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1775
1776        room_event_cache
1777            .inner
1778            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1779            .await
1780            .unwrap();
1781
1782        // The in-memory linked chunk keeps the bundled relation.
1783        {
1784            let (events, _) = room_event_cache.subscribe().await;
1785
1786            assert_eq!(events.len(), 1);
1787
1788            let ev = events[0].raw().deserialize().unwrap();
1789            assert_let!(
1790                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1791            );
1792
1793            let original = msg.as_original().unwrap();
1794            assert_eq!(original.content.body(), "hey yo");
1795            assert!(original.unsigned.relations.replace.is_some());
1796        }
1797
1798        // The one in storage does not.
1799        let linked_chunk =
1800            from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1801                .unwrap()
1802                .unwrap();
1803
1804        assert_eq!(linked_chunk.chunks().count(), 1);
1805
1806        let mut chunks = linked_chunk.chunks();
1807        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1808            assert_eq!(events.len(), 1);
1809
1810            let ev = events[0].raw().deserialize().unwrap();
1811            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1812
1813            let original = msg.as_original().unwrap();
1814            assert_eq!(original.content.body(), "hey yo");
1815            assert!(original.unsigned.relations.replace.is_none());
1816        });
1817
1818        // That's all, folks!
1819        assert!(chunks.next().is_none());
1820    }
1821
1822    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1823    #[async_test]
1824    async fn test_clear() {
1825        use eyeball_im::VectorDiff;
1826        use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1827
1828        use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
1829
1830        let room_id = room_id!("!galette:saucisse.bzh");
1831        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1832
1833        let event_cache_store = Arc::new(MemoryStore::new());
1834
1835        let event_id1 = event_id!("$1");
1836        let event_id2 = event_id!("$2");
1837
1838        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1839        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1840
1841        // Prefill the store with some data.
1842        event_cache_store
1843            .handle_linked_chunk_updates(
1844                room_id,
1845                vec![
1846                    // An empty items chunk.
1847                    Update::NewItemsChunk {
1848                        previous: None,
1849                        new: ChunkIdentifier::new(0),
1850                        next: None,
1851                    },
1852                    // A gap chunk.
1853                    Update::NewGapChunk {
1854                        previous: Some(ChunkIdentifier::new(0)),
1855                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1856                        new: ChunkIdentifier::new(42),
1857                        next: None,
1858                        gap: Gap { prev_token: "comté".to_owned() },
1859                    },
1860                    // Another items chunk, non-empty this time.
1861                    Update::NewItemsChunk {
1862                        previous: Some(ChunkIdentifier::new(42)),
1863                        new: ChunkIdentifier::new(1),
1864                        next: None,
1865                    },
1866                    Update::PushItems {
1867                        at: Position::new(ChunkIdentifier::new(1), 0),
1868                        items: vec![ev1.clone()],
1869                    },
1870                    // And another items chunk, non-empty again.
1871                    Update::NewItemsChunk {
1872                        previous: Some(ChunkIdentifier::new(1)),
1873                        new: ChunkIdentifier::new(2),
1874                        next: None,
1875                    },
1876                    Update::PushItems {
1877                        at: Position::new(ChunkIdentifier::new(2), 0),
1878                        items: vec![ev2.clone()],
1879                    },
1880                ],
1881            )
1882            .await
1883            .unwrap();
1884
1885        let client = MockClientBuilder::new("http://localhost".to_owned())
1886            .store_config(
1887                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1888            )
1889            .build()
1890            .await;
1891
1892        let event_cache = client.event_cache();
1893
1894        // Don't forget to subscribe and like^W enable storage!
1895        event_cache.subscribe().unwrap();
1896        event_cache.enable_storage().unwrap();
1897
1898        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1899        let room = client.get_room(room_id).unwrap();
1900
1901        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1902
1903        let (items, mut stream) = room_event_cache.subscribe().await;
1904
1905        // The rooms knows about all cached events.
1906        {
1907            assert!(room_event_cache.event(event_id1).await.is_some());
1908            assert!(room_event_cache.event(event_id2).await.is_some());
1909        }
1910
1911        // But only part of events are loaded from the store
1912        {
1913            // The room must contain only one event because only one chunk has been loaded.
1914            assert_eq!(items.len(), 1);
1915            assert_eq!(items[0].event_id().unwrap(), event_id2);
1916
1917            assert!(stream.is_empty());
1918        }
1919
1920        // Let's load more chunks to load all events.
1921        {
1922            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1923
1924            assert_let_timeout!(
1925                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1926            );
1927            assert_eq!(diffs.len(), 1);
1928            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1929                // Here you are `event_id1`!
1930                assert_eq!(event.event_id().unwrap(), event_id1);
1931            });
1932
1933            assert!(stream.is_empty());
1934        }
1935
1936        // After clearing,…
1937        room_event_cache.clear().await.unwrap();
1938
1939        //… we get an update that the content has been cleared.
1940        assert_let_timeout!(
1941            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1942        );
1943        assert_eq!(diffs.len(), 1);
1944        assert_let!(VectorDiff::Clear = &diffs[0]);
1945
1946        // The room event cache has forgotten about the events.
1947        assert!(room_event_cache.event(event_id1).await.is_none());
1948
1949        let (items, _) = room_event_cache.subscribe().await;
1950        assert!(items.is_empty());
1951
1952        // The event cache store too.
1953        let linked_chunk =
1954            from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1955                .unwrap()
1956                .unwrap();
1957
1958        // Note: while the event cache store could return `None` here, clearing it will
1959        // reset it to its initial form, maintaining the invariant that it
1960        // contains a single items chunk that's empty.
1961        assert_eq!(linked_chunk.num_items(), 0);
1962    }
1963
1964    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1965    #[async_test]
1966    async fn test_load_from_storage() {
1967        use eyeball_im::VectorDiff;
1968
1969        use super::RoomEventCacheUpdate;
1970        use crate::assert_let_timeout;
1971
1972        let room_id = room_id!("!galette:saucisse.bzh");
1973        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1974
1975        let event_cache_store = Arc::new(MemoryStore::new());
1976
1977        let event_id1 = event_id!("$1");
1978        let event_id2 = event_id!("$2");
1979
1980        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1981        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1982
1983        // Prefill the store with some data.
1984        event_cache_store
1985            .handle_linked_chunk_updates(
1986                room_id,
1987                vec![
1988                    // An empty items chunk.
1989                    Update::NewItemsChunk {
1990                        previous: None,
1991                        new: ChunkIdentifier::new(0),
1992                        next: None,
1993                    },
1994                    // A gap chunk.
1995                    Update::NewGapChunk {
1996                        previous: Some(ChunkIdentifier::new(0)),
1997                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1998                        new: ChunkIdentifier::new(42),
1999                        next: None,
2000                        gap: Gap { prev_token: "cheddar".to_owned() },
2001                    },
2002                    // Another items chunk, non-empty this time.
2003                    Update::NewItemsChunk {
2004                        previous: Some(ChunkIdentifier::new(42)),
2005                        new: ChunkIdentifier::new(1),
2006                        next: None,
2007                    },
2008                    Update::PushItems {
2009                        at: Position::new(ChunkIdentifier::new(1), 0),
2010                        items: vec![ev1.clone()],
2011                    },
2012                    // And another items chunk, non-empty again.
2013                    Update::NewItemsChunk {
2014                        previous: Some(ChunkIdentifier::new(1)),
2015                        new: ChunkIdentifier::new(2),
2016                        next: None,
2017                    },
2018                    Update::PushItems {
2019                        at: Position::new(ChunkIdentifier::new(2), 0),
2020                        items: vec![ev2.clone()],
2021                    },
2022                ],
2023            )
2024            .await
2025            .unwrap();
2026
2027        let client = MockClientBuilder::new("http://localhost".to_owned())
2028            .store_config(
2029                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
2030            )
2031            .build()
2032            .await;
2033
2034        let event_cache = client.event_cache();
2035
2036        // Don't forget to subscribe and like^W enable storage!
2037        event_cache.subscribe().unwrap();
2038        event_cache.enable_storage().unwrap();
2039
2040        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2041        let room = client.get_room(room_id).unwrap();
2042
2043        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2044
2045        let (items, mut stream) = room_event_cache.subscribe().await;
2046
2047        // The initial items contain one event because only the last chunk is loaded by
2048        // default.
2049        assert_eq!(items.len(), 1);
2050        assert_eq!(items[0].event_id().unwrap(), event_id2);
2051        assert!(stream.is_empty());
2052
2053        // The event cache knows only all events though, even if they aren't loaded.
2054        assert!(room_event_cache.event(event_id1).await.is_some());
2055        assert!(room_event_cache.event(event_id2).await.is_some());
2056
2057        // Let's paginate to load more events.
2058        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2059
2060        assert_let_timeout!(
2061            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2062        );
2063        assert_eq!(diffs.len(), 1);
2064        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2065            assert_eq!(event.event_id().unwrap(), event_id1);
2066        });
2067
2068        assert!(stream.is_empty());
2069
2070        // A new update with one of these events leads to deduplication.
2071        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2072        room_event_cache
2073            .inner
2074            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
2075            .await
2076            .unwrap();
2077
2078        // The stream doesn't report these changes *yet*. Use the items vector given
2079        // when subscribing, to check that the items correspond to their new
2080        // positions. The duplicated item is removed (so it's not the first
2081        // element anymore), and it's added to the back of the list.
2082        let (items, _stream) = room_event_cache.subscribe().await;
2083        assert_eq!(items.len(), 2);
2084        assert_eq!(items[0].event_id().unwrap(), event_id1);
2085        assert_eq!(items[1].event_id().unwrap(), event_id2);
2086    }
2087
2088    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
2089    #[async_test]
2090    async fn test_load_from_storage_resilient_to_failure() {
2091        let room_id = room_id!("!fondue:patate.ch");
2092        let event_cache_store = Arc::new(MemoryStore::new());
2093
2094        let event = EventFactory::new()
2095            .room(room_id)
2096            .sender(user_id!("@ben:saucisse.bzh"))
2097            .text_msg("foo")
2098            .event_id(event_id!("$42"))
2099            .into_event();
2100
2101        // Prefill the store with invalid data: two chunks that form a cycle.
2102        event_cache_store
2103            .handle_linked_chunk_updates(
2104                room_id,
2105                vec![
2106                    Update::NewItemsChunk {
2107                        previous: None,
2108                        new: ChunkIdentifier::new(0),
2109                        next: None,
2110                    },
2111                    Update::PushItems {
2112                        at: Position::new(ChunkIdentifier::new(0), 0),
2113                        items: vec![event],
2114                    },
2115                    Update::NewItemsChunk {
2116                        previous: Some(ChunkIdentifier::new(0)),
2117                        new: ChunkIdentifier::new(1),
2118                        next: Some(ChunkIdentifier::new(0)),
2119                    },
2120                ],
2121            )
2122            .await
2123            .unwrap();
2124
2125        let client = MockClientBuilder::new("http://localhost".to_owned())
2126            .store_config(
2127                StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
2128            )
2129            .build()
2130            .await;
2131
2132        let event_cache = client.event_cache();
2133
2134        // Don't forget to subscribe and like^W enable storage!
2135        event_cache.subscribe().unwrap();
2136        event_cache.enable_storage().unwrap();
2137
2138        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2139        let room = client.get_room(room_id).unwrap();
2140
2141        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2142
2143        let (items, _stream) = room_event_cache.subscribe().await;
2144
2145        // Because the persisted content was invalid, the room store is reset: there are
2146        // no events in the cache.
2147        assert!(items.is_empty());
2148
2149        // Storage doesn't contain anything. It would also be valid that it contains a
2150        // single initial empty items chunk.
2151        let raw_chunks = event_cache_store.load_all_chunks(room_id).await.unwrap();
2152        assert!(raw_chunks.is_empty());
2153    }
2154
2155    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
2156    #[async_test]
2157    async fn test_no_useless_gaps() {
2158        use crate::event_cache::room::LoadMoreEventsBackwardsOutcome;
2159
2160        let room_id = room_id!("!galette:saucisse.bzh");
2161
2162        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2163
2164        let event_cache = client.event_cache();
2165        event_cache.subscribe().unwrap();
2166
2167        let has_storage = true; // for testing purposes only
2168        event_cache.enable_storage().unwrap();
2169
2170        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2171        let room = client.get_room(room_id).unwrap();
2172        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2173
2174        let f = EventFactory::new().room(room_id).sender(*ALICE);
2175
2176        // Propagate an update including a limited timeline with one message and a
2177        // prev-batch token.
2178        room_event_cache
2179            .inner
2180            .handle_joined_room_update(
2181                has_storage,
2182                JoinedRoomUpdate {
2183                    timeline: Timeline {
2184                        limited: true,
2185                        prev_batch: Some("raclette".to_owned()),
2186                        events: vec![f.text_msg("hey yo").into_event()],
2187                    },
2188                    ..Default::default()
2189                },
2190            )
2191            .await
2192            .unwrap();
2193
2194        {
2195            let mut state = room_event_cache.inner.state.write().await;
2196
2197            let mut num_gaps = 0;
2198            let mut num_events = 0;
2199
2200            for c in state.events().chunks() {
2201                match c.content() {
2202                    ChunkContent::Items(items) => num_events += items.len(),
2203                    ChunkContent::Gap(_) => num_gaps += 1,
2204                }
2205            }
2206
2207            // The limited sync unloads the chunk, so it will appear as if there are only
2208            // the events.
2209            assert_eq!(num_gaps, 0);
2210            assert_eq!(num_events, 1);
2211
2212            // But if I manually reload more of the chunk, the gap will be present.
2213            assert_matches!(
2214                state.load_more_events_backwards().await.unwrap(),
2215                LoadMoreEventsBackwardsOutcome::Gap { .. }
2216            );
2217
2218            num_gaps = 0;
2219            num_events = 0;
2220            for c in state.events().chunks() {
2221                match c.content() {
2222                    ChunkContent::Items(items) => num_events += items.len(),
2223                    ChunkContent::Gap(_) => num_gaps += 1,
2224                }
2225            }
2226
2227            // The gap must have been stored.
2228            assert_eq!(num_gaps, 1);
2229            assert_eq!(num_events, 1);
2230        }
2231
2232        // Now, propagate an update for another message, but the timeline isn't limited
2233        // this time.
2234        room_event_cache
2235            .inner
2236            .handle_joined_room_update(
2237                has_storage,
2238                JoinedRoomUpdate {
2239                    timeline: Timeline {
2240                        limited: false,
2241                        prev_batch: Some("fondue".to_owned()),
2242                        events: vec![f.text_msg("sup").into_event()],
2243                    },
2244                    ..Default::default()
2245                },
2246            )
2247            .await
2248            .unwrap();
2249
2250        {
2251            let state = room_event_cache.inner.state.read().await;
2252
2253            let mut num_gaps = 0;
2254            let mut num_events = 0;
2255
2256            for c in state.events().chunks() {
2257                match c.content() {
2258                    ChunkContent::Items(items) => num_events += items.len(),
2259                    ChunkContent::Gap(gap) => {
2260                        assert_eq!(gap.prev_token, "raclette");
2261                        num_gaps += 1;
2262                    }
2263                }
2264            }
2265
2266            // There's only the previous gap, no new ones.
2267            assert_eq!(num_gaps, 1);
2268            assert_eq!(num_events, 2);
2269        }
2270    }
2271
2272    async fn assert_relations(
2273        room_id: &RoomId,
2274        original_event: TimelineEvent,
2275        related_event: TimelineEvent,
2276        event_factory: EventFactory,
2277    ) {
2278        let client = logged_in_client(None).await;
2279
2280        let event_cache = client.event_cache();
2281        event_cache.subscribe().unwrap();
2282
2283        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2284        let room = client.get_room(room_id).unwrap();
2285
2286        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2287
2288        // Save the original event.
2289        let original_event_id = original_event.event_id().unwrap();
2290        room_event_cache.save_event(original_event).await;
2291
2292        // Save an unrelated event to check it's not in the related events list.
2293        let unrelated_id = event_id!("$2");
2294        room_event_cache
2295            .save_event(event_factory.text_msg("An unrelated event").event_id(unrelated_id).into())
2296            .await;
2297
2298        // Save the related event.
2299        let related_id = related_event.event_id().unwrap();
2300        room_event_cache.save_event(related_event).await;
2301
2302        let (event, related_events) =
2303            room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
2304        // Fetched event is the right one.
2305        let cached_event_id = event.event_id().unwrap();
2306        assert_eq!(cached_event_id, original_event_id);
2307
2308        // There is only the actually related event in the related ones
2309        assert_eq!(related_events.len(), 1);
2310        let related_event_id = related_events[0].event_id().unwrap();
2311        assert_eq!(related_event_id, related_id);
2312    }
2313
2314    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
2315    #[async_test]
2316    async fn test_shrink_to_last_chunk() {
2317        use eyeball_im::VectorDiff;
2318
2319        use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
2320
2321        let room_id = room_id!("!galette:saucisse.bzh");
2322
2323        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2324
2325        let f = EventFactory::new().room(room_id);
2326
2327        let evid1 = event_id!("$1");
2328        let evid2 = event_id!("$2");
2329
2330        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2331        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2332
2333        // Fill the event cache store with an initial linked chunk with 2 events chunks.
2334        {
2335            let store = client.event_cache_store();
2336            let store = store.lock().await.unwrap();
2337            store
2338                .handle_linked_chunk_updates(
2339                    room_id,
2340                    vec![
2341                        Update::NewItemsChunk {
2342                            previous: None,
2343                            new: ChunkIdentifier::new(0),
2344                            next: None,
2345                        },
2346                        Update::PushItems {
2347                            at: Position::new(ChunkIdentifier::new(0), 0),
2348                            items: vec![ev1],
2349                        },
2350                        Update::NewItemsChunk {
2351                            previous: Some(ChunkIdentifier::new(0)),
2352                            new: ChunkIdentifier::new(1),
2353                            next: None,
2354                        },
2355                        Update::PushItems {
2356                            at: Position::new(ChunkIdentifier::new(1), 0),
2357                            items: vec![ev2],
2358                        },
2359                    ],
2360                )
2361                .await
2362                .unwrap();
2363        }
2364
2365        let event_cache = client.event_cache();
2366        event_cache.subscribe().unwrap();
2367        event_cache.enable_storage().unwrap();
2368
2369        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2370        let room = client.get_room(room_id).unwrap();
2371        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2372
2373        // Sanity check: lazily loaded, so only includes one item at start.
2374        let (events, mut stream) = room_event_cache.subscribe().await;
2375        assert_eq!(events.len(), 1);
2376        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2377        assert!(stream.is_empty());
2378
2379        // Force loading the full linked chunk by back-paginating.
2380        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2381        assert_eq!(outcome.events.len(), 1);
2382        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2383        assert!(outcome.reached_start);
2384
2385        // We also get an update about the loading from the store.
2386        assert_let_timeout!(
2387            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2388        );
2389        assert_eq!(diffs.len(), 1);
2390        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2391            assert_eq!(value.event_id().as_deref(), Some(evid1));
2392        });
2393
2394        assert!(stream.is_empty());
2395
2396        // Shrink the linked chunk to the last chunk.
2397        let diffs = room_event_cache
2398            .inner
2399            .state
2400            .write()
2401            .await
2402            .shrink_to_last_chunk()
2403            .await
2404            .expect("shrinking should succeed")
2405            .unwrap();
2406
2407        // We receive updates about the changes to the linked chunk.
2408        assert_eq!(diffs.len(), 2);
2409        assert_matches!(&diffs[0], VectorDiff::Clear);
2410        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
2411            assert_eq!(values.len(), 1);
2412            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
2413        });
2414
2415        assert!(stream.is_empty());
2416
2417        // When reading the events, we do get only the last one.
2418        let (events, _) = room_event_cache.subscribe().await;
2419        assert_eq!(events.len(), 1);
2420        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2421
2422        // But if we back-paginate, we don't need access to network to find out about
2423        // the previous event.
2424        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2425        assert_eq!(outcome.events.len(), 1);
2426        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2427        assert!(outcome.reached_start);
2428    }
2429
2430    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
2431    #[async_test]
2432    async fn test_auto_shrink_after_all_subscribers_are_gone() {
2433        use eyeball_im::VectorDiff;
2434        use tokio::task::yield_now;
2435
2436        use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
2437
2438        let room_id = room_id!("!galette:saucisse.bzh");
2439
2440        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2441
2442        let f = EventFactory::new().room(room_id);
2443
2444        let evid1 = event_id!("$1");
2445        let evid2 = event_id!("$2");
2446
2447        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2448        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2449
2450        // Fill the event cache store with an initial linked chunk with 2 events chunks.
2451        {
2452            let store = client.event_cache_store();
2453            let store = store.lock().await.unwrap();
2454            store
2455                .handle_linked_chunk_updates(
2456                    room_id,
2457                    vec![
2458                        Update::NewItemsChunk {
2459                            previous: None,
2460                            new: ChunkIdentifier::new(0),
2461                            next: None,
2462                        },
2463                        Update::PushItems {
2464                            at: Position::new(ChunkIdentifier::new(0), 0),
2465                            items: vec![ev1],
2466                        },
2467                        Update::NewItemsChunk {
2468                            previous: Some(ChunkIdentifier::new(0)),
2469                            new: ChunkIdentifier::new(1),
2470                            next: None,
2471                        },
2472                        Update::PushItems {
2473                            at: Position::new(ChunkIdentifier::new(1), 0),
2474                            items: vec![ev2],
2475                        },
2476                    ],
2477                )
2478                .await
2479                .unwrap();
2480        }
2481
2482        let event_cache = client.event_cache();
2483        event_cache.subscribe().unwrap();
2484        event_cache.enable_storage().unwrap();
2485
2486        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2487        let room = client.get_room(room_id).unwrap();
2488        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2489
2490        // Sanity check: lazily loaded, so only includes one item at start.
2491        let (events1, mut stream1) = room_event_cache.subscribe().await;
2492        assert_eq!(events1.len(), 1);
2493        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
2494        assert!(stream1.is_empty());
2495
2496        // Force loading the full linked chunk by back-paginating.
2497        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2498        assert_eq!(outcome.events.len(), 1);
2499        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2500        assert!(outcome.reached_start);
2501
2502        // We also get an update about the loading from the store. Ignore it, for this
2503        // test's sake.
2504        assert_let_timeout!(
2505            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
2506        );
2507        assert_eq!(diffs.len(), 1);
2508        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2509            assert_eq!(value.event_id().as_deref(), Some(evid1));
2510        });
2511
2512        assert!(stream1.is_empty());
2513
2514        // Have another listener subscribe to the event cache.
2515        // Since it's not the first one, and the previous one loaded some more events,
2516        // the second listener seems them all.
2517        let (events2, stream2) = room_event_cache.subscribe().await;
2518        assert_eq!(events2.len(), 2);
2519        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
2520        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
2521        assert!(stream2.is_empty());
2522
2523        // Drop the first stream, and wait a bit.
2524        drop(stream1);
2525        yield_now().await;
2526
2527        // The second stream remains undisturbed.
2528        assert!(stream2.is_empty());
2529
2530        // Now drop the second stream, and wait a bit.
2531        drop(stream2);
2532        yield_now().await;
2533
2534        // The linked chunk must have auto-shrunk by now.
2535
2536        {
2537            // Check the inner state: there's no more shared auto-shrinker.
2538            let state = room_event_cache.inner.state.read().await;
2539            assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0);
2540        }
2541
2542        // Getting the events will only give us the latest chunk.
2543        let (events3, _stream2) = room_event_cache.subscribe().await;
2544        assert_eq!(events3.len(), 1);
2545        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2546    }
2547}