Skip to main content

matrix_sdk/event_cache/caches/thread/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Threads-related data structures.
16
17pub mod pagination;
18mod state;
19mod updates;
20
21use std::{fmt, sync::Arc};
22
23use matrix_sdk_base::{
24    event_cache::Event,
25    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
26};
27use ruma::{
28    EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, events::relation::RelationType,
29    room_version_rules::RoomVersionRules,
30};
31use tokio::sync::{
32    Notify,
33    broadcast::{Receiver, Sender},
34};
35use tracing::{instrument, trace};
36
37use self::pagination::ThreadPagination;
38pub(in super::super) use self::state::ThreadEventCacheState;
39pub(super) use self::updates::ThreadEventCacheUpdateSender;
40#[cfg(feature = "e2e-encryption")]
41use super::super::redecryptor::ResolvedUtd;
42use super::{
43    super::{
44        Result,
45        states::{CacheStateLock, StateLock, selectors::ThreadStateSelector},
46    },
47    EventsOrigin, TimelineVectorDiffs,
48    room::{RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate},
49};
50use crate::room::WeakRoom;
51
52/// All the information related to a single thread.
53///
54/// Cloning is shallow, and thus is cheap to do.
55#[derive(Clone)]
56pub struct ThreadEventCache {
57    inner: Arc<ThreadEventCacheInner>,
58}
59
60/// The (non-cloneable) details of the `ThreadEventCache`.
61struct ThreadEventCacheInner {
62    /// The room ID.
63    room_id: OwnedRoomId,
64
65    /// The thread root ID.
66    thread_id: OwnedEventId,
67
68    /// The room where this thread belongs to.
69    weak_room: WeakRoom,
70
71    /// State for this thread's event cache.
72    state: CacheStateLock<ThreadStateSelector>,
73
74    /// A notifier that we received a new pagination token.
75    pagination_batch_token_notifier: Notify,
76
77    /// Update sender for this thread.
78    update_sender: ThreadEventCacheUpdateSender,
79}
80
81impl fmt::Debug for ThreadEventCache {
82    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83        f.debug_struct("ThreadEventCache").finish_non_exhaustive()
84    }
85}
86
87impl ThreadEventCache {
88    /// Create a new empty thread event cache.
89    #[allow(clippy::too_many_arguments)]
90    pub(super) async fn new(
91        room_id: OwnedRoomId,
92        thread_id: OwnedEventId,
93        own_user_id: OwnedUserId,
94        room_version_rules: RoomVersionRules,
95        weak_room: WeakRoom,
96        state: &StateLock,
97        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
98        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
99    ) -> Result<Self> {
100        let update_sender = ThreadEventCacheUpdateSender::new(generic_update_sender.clone());
101
102        let cache_state = state
103            .try_insert_once_with(
104                ThreadStateSelector::new(room_id.clone(), thread_id.clone()),
105                |store_guard| {
106                    ThreadEventCacheState::new(
107                        room_id.clone(),
108                        thread_id.clone(),
109                        own_user_id,
110                        room_version_rules,
111                        store_guard,
112                        update_sender.clone(),
113                        linked_chunk_update_sender,
114                    )
115                },
116            )
117            .await?;
118
119        let timeline_is_not_empty =
120            cache_state.read().await?.thread_linked_chunk().revents().next().is_some();
121
122        let cache = Self {
123            inner: Arc::new(ThreadEventCacheInner {
124                room_id: room_id.clone(),
125                thread_id,
126                weak_room,
127                state: cache_state,
128                pagination_batch_token_notifier: Notify::new(),
129                update_sender,
130            }),
131        };
132
133        // If at least one event has been loaded, it means there is a timeline. Let's
134        // emit a generic update.
135        if timeline_is_not_empty {
136            let _ = generic_update_sender
137                .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
138        }
139
140        Ok(cache)
141    }
142
143    /// Get the room ID for this room.
144    pub fn room_id(&self) -> &RoomId {
145        &self.inner.room_id
146    }
147
148    /// Get the thread ID for this thread.
149    pub fn thread_id(&self) -> &EventId {
150        &self.inner.thread_id
151    }
152
153    /// Subscribe to live events from this thread.
154    pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
155        let state = self.inner.state.read().await?;
156
157        let events =
158            state.thread_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
159
160        let recv = state.update_sender.new_thread_receiver();
161
162        Ok((events, recv))
163    }
164
165    /// Return a [`ThreadPagination`] useful for running back-pagination queries
166    /// in this thread.
167    pub fn pagination(&self) -> ThreadPagination {
168        ThreadPagination::new(self.inner.clone())
169    }
170
171    /// Return a reference to the state.
172    pub(in super::super) fn state(&self) -> &CacheStateLock<ThreadStateSelector> {
173        &self.inner.state
174    }
175
176    /// Handle a [`JoinedRoomUpdate`].
177    #[instrument(skip_all, fields(room_id = %self.inner.room_id, thread_root = %self.inner.thread_id))]
178    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
179        self.handle_timeline(updates.timeline).await?;
180
181        Ok(())
182    }
183
184    /// Handle a [`LeftRoomUpdate`].
185    #[instrument(skip_all, fields(room_id = %self.inner.room_id, thread_root = %self.inner.thread_id))]
186    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
187        self.handle_timeline(updates.timeline).await?;
188
189        Ok(())
190    }
191
192    /// Handle a [`Timeline`], i.e. new events received by a sync for this
193    /// thread.
194    async fn handle_timeline(&self, timeline: Timeline) -> Result<()> {
195        if timeline.events.is_empty() && timeline.prev_batch.is_none() {
196            return Ok(());
197        }
198
199        trace!("adding new events");
200
201        let mut state = self.inner.state.write().await?;
202
203        let (stored_prev_batch_token, timeline_event_diffs) = state.handle_sync(timeline).await?;
204
205        // Now that all events have been added, we can trigger the
206        // `pagination_token_notifier`.
207        if stored_prev_batch_token {
208            self.inner.pagination_batch_token_notifier.notify_one();
209        }
210
211        if !timeline_event_diffs.is_empty() {
212            state.update_sender.send(
213                TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync },
214                // This function is part of the `RoomEventCache` flow. The generic update is
215                // handled by it.
216                None,
217            );
218        }
219
220        Ok(())
221    }
222
223    /// Find a single event in this thread.
224    ///
225    /// It starts by looking into loaded events in `EventLinkedChunk` before
226    /// looking inside the storage.
227    pub(super) async fn find_event(
228        &self,
229        event_id: &EventId,
230    ) -> Result<Option<(super::EventLocation, Event)>> {
231        self.inner.state.read().await?.find_event(event_id).await
232    }
233
234    /// Try to find an event by ID in this thread, along with its related
235    /// events.
236    ///
237    /// You can filter which types of related events to retrieve using
238    /// `filter`. `None` will retrieve related events of any type.
239    ///
240    /// The related events are sorted like this:
241    ///
242    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
243    ///   located at the beginning of the array.
244    /// - events present in the linked chunk (be it in memory or in the storage)
245    ///   will be sorted according to their ordering in the linked chunk.
246    pub async fn find_event_with_relations(
247        &self,
248        event_id: &EventId,
249        filter: Option<Vec<RelationType>>,
250    ) -> Result<Option<(Event, Vec<Event>)>> {
251        // Search in all loaded or stored events.
252        Ok(self
253            .inner
254            .state
255            .read()
256            .await?
257            .find_event_with_relations(event_id, filter)
258            .await
259            .ok()
260            .flatten())
261    }
262
263    /// Try to locate the events in the linked chunk corresponding to the given
264    /// list of decrypted events, and replace them, while alerting observers
265    /// about the update.
266    ///
267    /// Return `true` if at least one event has been updated.
268    #[cfg(feature = "e2e-encryption")]
269    pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<bool> {
270        let mut state = self.inner.state.write().await?;
271        let timeline_event_diffs = state.replace_utds(events).await?;
272
273        Ok(
274            if let Some(timeline_event_diffs) = timeline_event_diffs
275                && !timeline_event_diffs.is_empty()
276            {
277                state.update_sender.send(
278                    TimelineVectorDiffs {
279                        diffs: timeline_event_diffs,
280                        origin: EventsOrigin::Cache,
281                    },
282                    Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
283                );
284
285                true
286            } else {
287                false
288            },
289        )
290    }
291}
292
293#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
294mod timed_tests {
295    use std::sync::Arc;
296
297    use assert_matches::assert_matches;
298    use assert_matches2::assert_let;
299    use eyeball_im::VectorDiff;
300    use futures_util::FutureExt as _;
301    use matrix_sdk_base::{
302        RoomState, ThreadingSupport,
303        cross_process_lock::CrossProcessLockConfig,
304        event_cache::{
305            Gap,
306            store::{EventCacheStore as _, MemoryStore},
307        },
308        linked_chunk::{
309            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
310            lazy_loader::from_all_chunks,
311        },
312        store::StoreConfig,
313        sync::{JoinedRoomUpdate, Timeline},
314    };
315    use matrix_sdk_test::{async_test, event_factory::EventFactory};
316    use ruma::{
317        event_id,
318        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
319        room_id, user_id,
320    };
321
322    use super::super::{super::RoomEventCacheGenericUpdate, TimelineVectorDiffs};
323    use crate::test_utils::client::MockClientBuilder;
324
325    #[async_test]
326    async fn test_write_to_storage() {
327        let room_id = room_id!("!r0");
328        let thread_root = event_id!("$t0_ev0");
329        let thread_event_id_0 = event_id!("$t0_ev1");
330
331        let f = EventFactory::new().room(room_id).sender(user_id!("@mnt_io:matrix.org"));
332
333        let event_cache_store = Arc::new(MemoryStore::new());
334
335        let client = MockClientBuilder::new(None)
336            .on_builder(|builder| {
337                builder
338                    .store_config(
339                        StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
340                            .event_cache_store(event_cache_store.clone()),
341                    )
342                    .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
343            })
344            .build()
345            .await;
346
347        let event_cache = client.event_cache();
348        event_cache.subscribe().unwrap();
349
350        client.base_client().get_or_create_room(room_id, RoomState::Joined);
351
352        let (thread_event_cache, _drop_handles) =
353            event_cache.thread(room_id, thread_root).await.unwrap();
354        let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
355
356        assert!(thread_events.is_empty());
357
358        // Propagate an update for a message and a prev-batch token.
359        let timeline = Timeline {
360            limited: true,
361            prev_batch: Some("raclette".to_owned()),
362            events: vec![
363                f.text_msg("salut")
364                    .event_id(thread_event_id_0)
365                    .in_thread(thread_root, thread_root)
366                    .into_event(),
367            ],
368        };
369
370        thread_event_cache
371            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
372            .await
373            .unwrap();
374
375        assert_matches!(
376            thread_stream.recv().await,
377            Ok(TimelineVectorDiffs { diffs, .. }) => {
378                assert_eq!(diffs.len(), 2);
379                assert_matches!(&diffs[0], VectorDiff::Clear);
380                assert_matches!(&diffs[1], VectorDiff::Append { values: events } => {
381                    assert_eq!(events.len(), 1);
382                    assert_eq!(events[0].event_id(), Some(thread_event_id_0));
383                });
384            }
385        );
386        assert!(thread_stream.is_empty());
387
388        // Check the storage.
389        let linked_chunk = from_all_chunks::<3, _, _>(
390            event_cache_store
391                .load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
392                .await
393                .unwrap(),
394        )
395        .unwrap()
396        .unwrap();
397
398        assert_eq!(linked_chunk.chunks().count(), 2);
399
400        let mut chunks = linked_chunk.chunks();
401
402        // We start with the gap.
403        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
404            assert_eq!(gap.token, "raclette");
405        });
406
407        // Then we have the stored event.
408        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
409            assert_eq!(events.len(), 1);
410            assert_eq!(events[0].event_id(), Some(thread_event_id_0));
411        });
412
413        // That's all, folks!
414        assert!(chunks.next().is_none());
415    }
416
417    #[async_test]
418    async fn test_write_to_storage_strips_bundled_relations() {
419        let sender = user_id!("@mnt_io:matrix.org");
420        let room_id = room_id!("!r0");
421        let thread_root = event_id!("$t0_ev0");
422        let thread_event_id_0 = event_id!("$t0_ev1");
423
424        let f = EventFactory::new().room(room_id).sender(sender);
425
426        let event_cache_store = Arc::new(MemoryStore::new());
427
428        let client = MockClientBuilder::new(None)
429            .on_builder(|builder| {
430                builder
431                    .store_config(
432                        StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
433                            .event_cache_store(event_cache_store.clone()),
434                    )
435                    .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
436            })
437            .build()
438            .await;
439
440        let event_cache = client.event_cache();
441        event_cache.subscribe().unwrap();
442
443        client.base_client().get_or_create_room(room_id, RoomState::Joined);
444
445        let (thread_event_cache, _drop_handles) =
446            event_cache.thread(room_id, thread_root).await.unwrap();
447
448        // Propagate an update for a message with bundled relations.
449        let timeline = Timeline {
450            limited: false,
451            prev_batch: None,
452            events: vec![
453                f.text_msg("s 'up")
454                    .event_id(thread_event_id_0)
455                    .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(sender))
456                    .in_thread(thread_root, thread_root)
457                    .into_event(),
458            ],
459        };
460
461        thread_event_cache
462            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
463            .await
464            .unwrap();
465
466        // The in-memory linked chunk keeps the bundled relation.
467        {
468            let (events, _) = thread_event_cache.subscribe().await.unwrap();
469
470            assert_eq!(events.len(), 1);
471
472            let event = events[0].raw().deserialize().unwrap();
473            assert_eq!(event.event_id(), thread_event_id_0);
474            assert_let!(
475                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) =
476                    event
477            );
478            assert!(msg.as_original().unwrap().unsigned.relations.replace.is_some());
479        }
480
481        // The one in storage does not.
482        let linked_chunk = from_all_chunks::<3, _, _>(
483            event_cache_store
484                .load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
485                .await
486                .unwrap(),
487        )
488        .unwrap()
489        .unwrap();
490
491        assert_eq!(linked_chunk.chunks().count(), 1);
492
493        let mut chunks = linked_chunk.chunks();
494        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
495            assert_eq!(events.len(), 1);
496
497            let event = events[0].raw().deserialize().unwrap();
498            assert_eq!(event.event_id(), thread_event_id_0);
499
500            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = event);
501            assert!(msg.as_original().unwrap().unsigned.relations.replace.is_none());
502        });
503
504        // That's all, folks!
505        assert!(chunks.next().is_none());
506    }
507
508    #[async_test]
509    async fn test_clear() {
510        let room_id = room_id!("!r0");
511        let f = EventFactory::new().room(room_id).sender(user_id!("@mnt_io:matrix.org"));
512
513        let event_cache_store = Arc::new(MemoryStore::new());
514
515        let thread_root = event_id!("$t0_ev0");
516        let thread_event_id_0 = event_id!("$t0_ev1");
517        let thread_event_id_1 = event_id!("$t0_ev2");
518
519        let thread_event_0 = f
520            .text_msg("foo")
521            .event_id(thread_event_id_0)
522            .in_thread(thread_root, thread_root)
523            .into_event();
524        let thread_event_1 = f
525            .text_msg("bar")
526            .event_id(thread_event_id_1)
527            .in_thread(thread_root, thread_event_id_0)
528            .into_event();
529
530        // Prefill the store with some data.
531        event_cache_store
532            .handle_linked_chunk_updates(
533                LinkedChunkId::Thread(room_id, thread_root),
534                vec![
535                    // An empty items chunk.
536                    Update::NewItemsChunk {
537                        previous: None,
538                        new: ChunkIdentifier::new(0),
539                        next: None,
540                    },
541                    // A gap chunk.
542                    Update::NewGapChunk {
543                        previous: Some(ChunkIdentifier::new(0)),
544                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
545                        new: ChunkIdentifier::new(42),
546                        next: None,
547                        gap: Gap { token: "comté".to_owned() },
548                    },
549                    // Another items chunk, non-empty this time.
550                    Update::NewItemsChunk {
551                        previous: Some(ChunkIdentifier::new(42)),
552                        new: ChunkIdentifier::new(1),
553                        next: None,
554                    },
555                    Update::PushItems {
556                        at: Position::new(ChunkIdentifier::new(1), 0),
557                        items: vec![thread_event_0.clone()],
558                    },
559                    // And another items chunk, non-empty again.
560                    Update::NewItemsChunk {
561                        previous: Some(ChunkIdentifier::new(1)),
562                        new: ChunkIdentifier::new(2),
563                        next: None,
564                    },
565                    Update::PushItems {
566                        at: Position::new(ChunkIdentifier::new(2), 0),
567                        items: vec![thread_event_1.clone()],
568                    },
569                ],
570            )
571            .await
572            .unwrap();
573
574        let client = MockClientBuilder::new(None)
575            .on_builder(|builder| {
576                builder
577                    .store_config(
578                        StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
579                            .event_cache_store(event_cache_store.clone()),
580                    )
581                    .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
582            })
583            .build()
584            .await;
585
586        let event_cache = client.event_cache();
587        event_cache.subscribe().unwrap();
588
589        client.base_client().get_or_create_room(room_id, RoomState::Joined);
590
591        let (thread_event_cache, _drop_handles) =
592            event_cache.thread(room_id, thread_root).await.unwrap();
593        let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
594
595        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
596
597        // The thread knows about all cached events.
598        {
599            assert!(thread_event_cache.find_event(thread_event_id_0).await.unwrap().is_some());
600            assert!(thread_event_cache.find_event(thread_event_id_1).await.unwrap().is_some());
601        }
602
603        // But only part of events are loaded from the store.
604        {
605            // The thread must contain only one event because only one chunk has been
606            // loaded.
607            assert_eq!(thread_events.len(), 1);
608            assert_eq!(thread_events[0].event_id().unwrap(), thread_event_id_1);
609
610            assert!(thread_stream.is_empty());
611        }
612
613        // Let's load more chunks to load all events.
614        {
615            thread_event_cache.pagination().run_backwards_once(20).await.unwrap();
616
617            assert_matches!(
618                thread_stream.recv().await,
619                Ok(TimelineVectorDiffs { diffs, .. }) => {
620                    assert_eq!(diffs.len(), 1);
621                    assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
622                        // Here you are `thread_event_0`!
623                        assert_eq!(event.event_id(), Some(thread_event_id_0));
624                    });
625                }
626            );
627            assert!(thread_stream.is_empty());
628
629            assert_matches!(
630                generic_stream.recv().await,
631                Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
632                    assert_eq!(room_id, expected_room_id);
633                }
634            );
635            assert!(generic_stream.is_empty());
636        }
637
638        // After clearing,…
639        event_cache.clear_all_rooms().await.unwrap();
640
641        //… we get an update that the content has been cleared.
642        assert_matches!(
643            thread_stream.recv().await,
644            Ok(TimelineVectorDiffs { diffs, .. }) => {
645                assert_eq!(diffs.len(), 2);
646                assert_matches!(&diffs[0], VectorDiff::Clear);
647                assert_matches!(&diffs[1], VectorDiff::Append { values } => {
648                    assert!(values.is_empty());
649                });
650            }
651        );
652
653        // … same with a generic update.
654        // (update for the clearing of the room)
655        assert_matches!(
656            generic_stream.recv().await,
657            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) => {
658                assert_eq!(received_room_id, room_id);
659            }
660        );
661        // (update for the clearing of the thread)
662        assert_matches!(
663            generic_stream.recv().await,
664            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) => {
665                assert_eq!(received_room_id, room_id);
666            }
667        );
668        assert!(generic_stream.is_empty());
669
670        // Events individually are forgotten by the event cache, after clearing the
671        // threads.
672        assert!(thread_event_cache.find_event(thread_event_id_0).await.unwrap().is_none());
673        assert!(thread_event_cache.find_event(thread_event_id_1).await.unwrap().is_none());
674
675        // And their presence in a linked chunk is forgotten.
676        let (thread_events, _) = thread_event_cache.subscribe().await.unwrap();
677        assert!(thread_events.is_empty());
678
679        // The event cache store is totally empty.
680        let linked_chunk = from_all_chunks::<3, _, _>(
681            event_cache_store
682                .load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
683                .await
684                .unwrap(),
685        )
686        .unwrap()
687        .unwrap();
688
689        // Note: while the event cache store could return `None` here, clearing it will
690        // reset it to its initial form, maintaining the invariant that it
691        // contains a single items chunk that's empty.
692        assert_eq!(linked_chunk.num_items(), 0);
693    }
694
695    #[async_test]
696    async fn test_load_from_storage() {
697        let room_id = room_id!("!r0");
698        let f = EventFactory::new().room(room_id).sender(user_id!("@mnt_io:matrix.org"));
699
700        let event_cache_store = Arc::new(MemoryStore::new());
701
702        let thread_root = event_id!("$t0");
703        let thread_event_id_0 = event_id!("$t0_ev0");
704        let thread_event_id_1 = event_id!("$t0_ev1");
705
706        let thread_event_0 = f
707            .text_msg("hello world")
708            .event_id(thread_event_id_0)
709            .in_thread(thread_root, thread_root)
710            .into_event();
711        let thread_event_1 = f
712            .text_msg("how's it going")
713            .event_id(thread_event_id_1)
714            .in_thread(thread_root, thread_event_id_1)
715            .into_event();
716
717        // Prefill the store with some data. The room usually has all events duplicated
718        // from the threads. It's important to make the test pass when checking the
719        // generic update.
720        let updates = vec![
721            // An empty items chunk.
722            Update::NewItemsChunk { previous: None, new: ChunkIdentifier::new(0), next: None },
723            // A gap chunk.
724            Update::NewGapChunk {
725                previous: Some(ChunkIdentifier::new(0)),
726                // Chunk IDs aren't supposed to be ordered, so use a random value here.
727                new: ChunkIdentifier::new(42),
728                next: None,
729                gap: Gap { token: "gruyère".to_owned() },
730            },
731            // Another items chunk, non-empty this time.
732            Update::NewItemsChunk {
733                previous: Some(ChunkIdentifier::new(42)),
734                new: ChunkIdentifier::new(1),
735                next: None,
736            },
737            Update::PushItems {
738                at: Position::new(ChunkIdentifier::new(1), 0),
739                items: vec![thread_event_0.clone()],
740            },
741            // And another items chunk, non-empty again.
742            Update::NewItemsChunk {
743                previous: Some(ChunkIdentifier::new(1)),
744                new: ChunkIdentifier::new(2),
745                next: None,
746            },
747            Update::PushItems {
748                at: Position::new(ChunkIdentifier::new(2), 0),
749                items: vec![thread_event_1.clone()],
750            },
751        ];
752        event_cache_store
753            .handle_linked_chunk_updates(LinkedChunkId::Room(room_id), updates.clone())
754            .await
755            .unwrap();
756        event_cache_store
757            .handle_linked_chunk_updates(LinkedChunkId::Thread(room_id, thread_root), updates)
758            .await
759            .unwrap();
760
761        let client = MockClientBuilder::new(None)
762            .on_builder(|builder| {
763                builder
764                    .store_config(
765                        StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
766                            .event_cache_store(event_cache_store.clone()),
767                    )
768                    .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
769            })
770            .build()
771            .await;
772
773        let event_cache = client.event_cache();
774        event_cache.subscribe().unwrap();
775
776        client.base_client().get_or_create_room(room_id, RoomState::Joined);
777
778        // Let's check whether the generic updates are received for the initialisation.
779        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
780        let (thread_event_cache, _drop_handles) =
781            event_cache.thread(room_id, thread_root).await.unwrap();
782        let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
783
784        // The room **and** the thread have been loaded. Two generic updates must have
785        // been triggered.
786        for _ in 0..2 {
787            assert_matches!(
788                generic_stream.recv().await,
789                Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
790                    assert_eq!(room_id, expected_room_id);
791                }
792            );
793        }
794        assert!(generic_stream.is_empty());
795
796        // The initial events contain one event because only the last chunk is loaded by
797        // default.
798        assert_eq!(thread_events.len(), 1);
799        assert_eq!(thread_events[0].event_id().unwrap(), thread_event_id_1);
800        assert!(thread_stream.is_empty());
801
802        // The thread knows all events in the storage though, even if they aren't
803        // loaded.
804        assert!(thread_event_cache.find_event(thread_event_id_0).await.unwrap().is_some());
805        assert!(thread_event_cache.find_event(thread_event_id_1).await.unwrap().is_some());
806
807        // Let's paginate to load more events.
808        thread_event_cache.pagination().run_backwards_once(20).await.unwrap();
809
810        assert_matches!(
811            thread_stream.recv().await,
812            Ok(TimelineVectorDiffs { diffs, .. }) => {
813                assert_eq!(diffs.len(), 1);
814                assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
815                    assert_eq!(event.event_id(), Some(thread_event_id_0));
816                });
817            }
818        );
819        assert!(thread_stream.is_empty());
820
821        // A generic update is triggered too.
822        assert_matches!(
823            generic_stream.recv().await,
824            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
825                assert_eq!(expected_room_id, room_id);
826            }
827        );
828        assert!(generic_stream.is_empty());
829
830        // A new update with one of these events leads to deduplication.
831        let timeline = Timeline { limited: false, prev_batch: None, events: vec![thread_event_1] };
832
833        thread_event_cache
834            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
835            .await
836            .unwrap();
837
838        // Just checking the generic update is correct. There is a duplicate event, so
839        // no generic changes whatsoever!
840        assert!(generic_stream.recv().now_or_never().is_none());
841
842        // The stream doesn't report these changes *yet*. Use the events vector given
843        // when subscribing, to check that the events correspond to their new
844        // positions. The duplicated item is removed (so it's not the first
845        // element anymore), and it's added to the back of the list.
846        let (thread_events, _) = thread_event_cache.subscribe().await.unwrap();
847        assert_eq!(thread_events.len(), 2);
848        assert_eq!(thread_events[0].event_id(), Some(thread_event_id_0));
849        assert_eq!(thread_events[1].event_id(), Some(thread_event_id_1));
850    }
851
852    #[async_test]
853    async fn test_load_from_storage_resilient_to_failure() {
854        let room_id = room_id!("!r0");
855        let f = EventFactory::new().room(room_id).sender(user_id!("@mnt_io:matrix.org"));
856
857        let event_cache_store = Arc::new(MemoryStore::new());
858
859        let thread_root = event_id!("$t0");
860        let thread_event_id_0 = event_id!("$t0_ev0");
861
862        let thread_event_0 = f
863            .text_msg("hello world")
864            .event_id(thread_event_id_0)
865            .in_thread(thread_root, thread_root)
866            .into_event();
867
868        // Prefill the store with invalid data: two chunks that form a cycle.
869        event_cache_store
870            .handle_linked_chunk_updates(
871                LinkedChunkId::Thread(room_id, thread_root),
872                vec![
873                    Update::NewItemsChunk {
874                        previous: None,
875                        new: ChunkIdentifier::new(0),
876                        next: None,
877                    },
878                    Update::PushItems {
879                        at: Position::new(ChunkIdentifier::new(0), 0),
880                        items: vec![thread_event_0],
881                    },
882                    Update::NewItemsChunk {
883                        previous: Some(ChunkIdentifier::new(0)),
884                        new: ChunkIdentifier::new(1),
885                        next: Some(ChunkIdentifier::new(0)),
886                    },
887                ],
888            )
889            .await
890            .unwrap();
891
892        let client = MockClientBuilder::new(None)
893            .on_builder(|builder| {
894                builder
895                    .store_config(
896                        StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
897                            .event_cache_store(event_cache_store.clone()),
898                    )
899                    .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
900            })
901            .build()
902            .await;
903
904        let event_cache = client.event_cache();
905        event_cache.subscribe().unwrap();
906
907        client.base_client().get_or_create_room(room_id, RoomState::Joined);
908
909        let (thread_event_cache, _drop_handles) =
910            event_cache.thread(room_id, thread_root).await.unwrap();
911        let (thread_events, _) = thread_event_cache.subscribe().await.unwrap();
912
913        // Because the persisted content was invalid, the thread store is reset:
914        // there are no events in the cache.
915        assert!(thread_events.is_empty());
916
917        // Storage doesn't contain anything. It would also be valid that it contains a
918        // single initial empty items chunk.
919        let raw_chunks = event_cache_store
920            .load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
921            .await
922            .unwrap();
923        assert!(raw_chunks.is_empty());
924    }
925
926    #[async_test]
927    async fn test_reload_when_dirty() {
928        let user_id = user_id!("@mnt_io:matrix.org");
929        let room_id = room_id!("!raclette:patate.ch");
930
931        // The storage shared by the two clients.
932        let event_cache_store = MemoryStore::new();
933
934        // Client for the process 0.
935        let client_p0 = MockClientBuilder::new(None)
936            .on_builder(|builder| {
937                builder
938                    .store_config(
939                        StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
940                            .event_cache_store(event_cache_store.clone()),
941                    )
942                    .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
943            })
944            .build()
945            .await;
946
947        // Client for the process 1.
948        let client_p1 = MockClientBuilder::new(None)
949            .on_builder(|builder| {
950                builder
951                    .store_config(
952                        StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
953                            .event_cache_store(event_cache_store),
954                    )
955                    .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
956            })
957            .build()
958            .await;
959
960        let event_factory = EventFactory::new().room(room_id).sender(user_id);
961
962        let thread_root = event_id!("$t0");
963        let thread_event_id_0 = event_id!("$t0_ev0");
964        let thread_event_id_1 = event_id!("$t0_ev1");
965
966        let thread_event_0 = event_factory
967            .text_msg("comté")
968            .event_id(thread_event_id_0)
969            .in_thread(thread_root, thread_root)
970            .into_event();
971        let thread_event_1 = event_factory
972            .text_msg("morbier")
973            .event_id(thread_event_id_1)
974            .in_thread(thread_root, thread_event_id_0)
975            .into_event();
976
977        // Add events to the storage (shared by the two clients!).
978        client_p0
979            .event_cache_store()
980            .lock()
981            .await
982            .expect("[p0] Could not acquire the event cache lock")
983            .as_clean()
984            .expect("[p0] Could not acquire a clean event cache lock")
985            .handle_linked_chunk_updates(
986                LinkedChunkId::Thread(room_id, thread_root),
987                vec![
988                    Update::NewItemsChunk {
989                        previous: None,
990                        new: ChunkIdentifier::new(0),
991                        next: None,
992                    },
993                    Update::PushItems {
994                        at: Position::new(ChunkIdentifier::new(0), 0),
995                        items: vec![thread_event_0],
996                    },
997                    Update::NewItemsChunk {
998                        previous: Some(ChunkIdentifier::new(0)),
999                        new: ChunkIdentifier::new(1),
1000                        next: None,
1001                    },
1002                    Update::PushItems {
1003                        at: Position::new(ChunkIdentifier::new(1), 0),
1004                        items: vec![thread_event_1],
1005                    },
1006                ],
1007            )
1008            .await
1009            .unwrap();
1010
1011        // Subscribe the event caches, and create the room.
1012        let (thread_event_cache_p0, thread_event_cache_p1) = {
1013            let event_cache_p0 = client_p0.event_cache();
1014            event_cache_p0.subscribe().unwrap();
1015
1016            let event_cache_p1 = client_p1.event_cache();
1017            event_cache_p1.subscribe().unwrap();
1018
1019            client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
1020            client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
1021
1022            let (thread_event_cache_p0, _drop_handles) =
1023                event_cache_p0.thread(room_id, thread_root).await.unwrap();
1024            let (thread_event_cache_p1, _drop_handles) =
1025                event_cache_p1.thread(room_id, thread_root).await.unwrap();
1026
1027            (thread_event_cache_p0, thread_event_cache_p1)
1028        };
1029
1030        // Okay. We are ready for the test!
1031        //
1032        // First off, let's check `thread_event_cache_p0` has access to the first event
1033        // loaded in-memory, then do a pagination, and see more events.
1034        let mut updates_stream_p0 = {
1035            let thread_event_cache = &thread_event_cache_p0;
1036
1037            let (initial_updates, mut updates_stream) =
1038                thread_event_cache_p0.subscribe().await.unwrap();
1039
1040            // Initial updates contain `thread_event_id_1` only.
1041            assert_eq!(initial_updates.len(), 1);
1042            assert_eq!(initial_updates[0].event_id(), Some(thread_event_id_1));
1043            assert!(updates_stream.is_empty());
1044
1045            // Load one more event with a backpagination.
1046            thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
1047
1048            // A new update for `ev_id_0` must be present.
1049            assert_matches!(
1050                updates_stream.recv().await.unwrap(),
1051                TimelineVectorDiffs { diffs, .. } => {
1052                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
1053                    assert_matches!(
1054                        &diffs[0],
1055                        VectorDiff::Insert { index: 0, value: event } => {
1056                            assert_eq!(event.event_id(), Some(thread_event_id_0));
1057                        }
1058                    );
1059                }
1060            );
1061
1062            updates_stream
1063        };
1064
1065        // Second, let's check `thread_event_cache_p1` has the same accesses.
1066        let mut updates_stream_p1 = {
1067            let thread_event_cache = &thread_event_cache_p1;
1068            let (initial_updates, mut updates_stream) =
1069                thread_event_cache_p1.subscribe().await.unwrap();
1070
1071            // Initial updates contain `thread_event_id_1` only.
1072            assert_eq!(initial_updates.len(), 1);
1073            assert_eq!(initial_updates[0].event_id(), Some(thread_event_id_1));
1074            assert!(updates_stream.is_empty());
1075
1076            // Load one more event with a backpagination.
1077            thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
1078
1079            // A new update for `thread_event_id_0` must be present.
1080            assert_matches!(
1081                updates_stream.recv().await.unwrap(),
1082                TimelineVectorDiffs { diffs, .. } => {
1083                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
1084                    assert_matches!(
1085                        &diffs[0],
1086                        VectorDiff::Insert { index: 0, value: event } => {
1087                            assert_eq!(event.event_id(), Some(thread_event_id_0));
1088                        }
1089                    );
1090                }
1091            );
1092
1093            updates_stream
1094        };
1095
1096        // Do this a couple times, for the fun.
1097        for _ in 0..3 {
1098            // Third, because `thread_event_cache_p1` has locked the store, the lock
1099            // is dirty for `thread_event_cache_p0`, so it will shrink to its last
1100            // chunk for the thread!
1101            {
1102                let thread_event_cache = &thread_event_cache_p0;
1103                let updates_stream = &mut updates_stream_p0;
1104
1105                // `thread_event_id_1` must be loaded in memory, just like before.
1106                // However, `thread_event_id_0` must NOT be loaded in memory. It WAS loaded, but
1107                // the state has been reloaded to its last chunk.
1108                let (initial_updates, _) = thread_event_cache.subscribe().await.unwrap();
1109
1110                assert_eq!(initial_updates.len(), 1);
1111                assert_eq!(initial_updates[0].event_id(), Some(thread_event_id_1));
1112
1113                // The reload can be observed via the updates too.
1114                assert_matches!(
1115                    updates_stream.recv().await.unwrap(),
1116                    TimelineVectorDiffs { diffs, .. } => {
1117                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
1118                        assert_matches!(&diffs[0], VectorDiff::Clear);
1119                        assert_matches!(
1120                            &diffs[1],
1121                            VectorDiff::Append { values: events } => {
1122                                assert_eq!(events.len(), 1);
1123                                assert_eq!(events[0].event_id(), Some(thread_event_id_1));
1124                            }
1125                        );
1126                    }
1127                );
1128
1129                // Load one more event with a backpagination.
1130                thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
1131
1132                // `thread_event_id_0` must now be loaded in memory.
1133                // The pagination can be observed via the updates.
1134                assert_matches!(
1135                    updates_stream.recv().await.unwrap(),
1136                    TimelineVectorDiffs { diffs, .. } => {
1137                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
1138                        assert_matches!(
1139                            &diffs[0],
1140                            VectorDiff::Insert { index: 0, value: event } => {
1141                                assert_eq!(event.event_id(), Some(thread_event_id_0));
1142                            }
1143                        );
1144                    }
1145                );
1146            }
1147
1148            // Fourth, because `thread_event_cache_p0` has locked the store again, the lock
1149            // is dirty for `thread_event_cache_p1` too!, so it will shrink to its last
1150            // chunk for the thread!
1151            {
1152                let thread_event_cache = &thread_event_cache_p1;
1153                let updates_stream = &mut updates_stream_p1;
1154
1155                // `thread_event_id_1` must be loaded in memory, just like before.
1156                // However, `thread_event_id_0` must NOT be loaded in memory. It WAS loaded, but
1157                // the state has shrunk to its last chunk.
1158                let (initial_updates, _) = thread_event_cache.subscribe().await.unwrap();
1159
1160                assert_eq!(initial_updates.len(), 1);
1161                assert_eq!(initial_updates[0].event_id(), Some(thread_event_id_1));
1162
1163                // The reload can be observed via the updates too.
1164                assert_matches!(
1165                    updates_stream.recv().await.unwrap(),
1166                    TimelineVectorDiffs { diffs, .. } => {
1167                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
1168                        assert_matches!(&diffs[0], VectorDiff::Clear);
1169                        assert_matches!(
1170                            &diffs[1],
1171                            VectorDiff::Append { values: events } => {
1172                                assert_eq!(events.len(), 1);
1173                                assert_eq!(events[0].event_id(), Some(thread_event_id_1));
1174                            }
1175                        );
1176                    }
1177                );
1178
1179                // Load one more event with a backpagination.
1180                thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
1181
1182                // `thread_event_id_0` must now be loaded in memory.
1183                // The pagination can be observed via the updates.
1184                assert_matches!(
1185                    updates_stream.recv().await.unwrap(),
1186                    TimelineVectorDiffs { diffs, .. } => {
1187                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
1188                        assert_matches!(
1189                            &diffs[0],
1190                            VectorDiff::Insert { index: 0, value: event } => {
1191                                assert_eq!(event.event_id(), Some(thread_event_id_0));
1192                            }
1193                        );
1194                    }
1195                );
1196            }
1197        }
1198    }
1199}