1pub mod pagination;
18mod state;
19mod updates;
20
21use std::{fmt, sync::Arc};
22
23use matrix_sdk_base::{
24 event_cache::Event,
25 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
26};
27use ruma::{
28 EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, events::relation::RelationType,
29 room_version_rules::RoomVersionRules,
30};
31use tokio::sync::{
32 Notify,
33 broadcast::{Receiver, Sender},
34};
35use tracing::{instrument, trace};
36
37use self::pagination::ThreadPagination;
38pub(in super::super) use self::state::ThreadEventCacheState;
39pub(super) use self::updates::ThreadEventCacheUpdateSender;
40#[cfg(feature = "e2e-encryption")]
41use super::super::redecryptor::ResolvedUtd;
42use super::{
43 super::{
44 Result,
45 states::{CacheStateLock, StateLock, selectors::ThreadStateSelector},
46 },
47 EventsOrigin, TimelineVectorDiffs,
48 room::{RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate},
49};
50use crate::room::WeakRoom;
51
52#[derive(Clone)]
56pub struct ThreadEventCache {
57 inner: Arc<ThreadEventCacheInner>,
58}
59
60struct ThreadEventCacheInner {
62 room_id: OwnedRoomId,
64
65 thread_id: OwnedEventId,
67
68 weak_room: WeakRoom,
70
71 state: CacheStateLock<ThreadStateSelector>,
73
74 pagination_batch_token_notifier: Notify,
76
77 update_sender: ThreadEventCacheUpdateSender,
79}
80
81impl fmt::Debug for ThreadEventCache {
82 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83 f.debug_struct("ThreadEventCache").finish_non_exhaustive()
84 }
85}
86
87impl ThreadEventCache {
88 #[allow(clippy::too_many_arguments)]
90 pub(super) async fn new(
91 room_id: OwnedRoomId,
92 thread_id: OwnedEventId,
93 own_user_id: OwnedUserId,
94 room_version_rules: RoomVersionRules,
95 weak_room: WeakRoom,
96 state: &StateLock,
97 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
98 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
99 ) -> Result<Self> {
100 let update_sender = ThreadEventCacheUpdateSender::new(generic_update_sender.clone());
101
102 let cache_state = state
103 .try_insert_once_with(
104 ThreadStateSelector::new(room_id.clone(), thread_id.clone()),
105 |store_guard| {
106 ThreadEventCacheState::new(
107 room_id.clone(),
108 thread_id.clone(),
109 own_user_id,
110 room_version_rules,
111 store_guard,
112 update_sender.clone(),
113 linked_chunk_update_sender,
114 )
115 },
116 )
117 .await?;
118
119 let timeline_is_not_empty =
120 cache_state.read().await?.thread_linked_chunk().revents().next().is_some();
121
122 let cache = Self {
123 inner: Arc::new(ThreadEventCacheInner {
124 room_id: room_id.clone(),
125 thread_id,
126 weak_room,
127 state: cache_state,
128 pagination_batch_token_notifier: Notify::new(),
129 update_sender,
130 }),
131 };
132
133 if timeline_is_not_empty {
136 let _ = generic_update_sender
137 .send(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
138 }
139
140 Ok(cache)
141 }
142
143 pub fn room_id(&self) -> &RoomId {
145 &self.inner.room_id
146 }
147
148 pub fn thread_id(&self) -> &EventId {
150 &self.inner.thread_id
151 }
152
153 pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
155 let state = self.inner.state.read().await?;
156
157 let events =
158 state.thread_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
159
160 let recv = state.update_sender.new_thread_receiver();
161
162 Ok((events, recv))
163 }
164
165 pub fn pagination(&self) -> ThreadPagination {
168 ThreadPagination::new(self.inner.clone())
169 }
170
171 pub(in super::super) fn state(&self) -> &CacheStateLock<ThreadStateSelector> {
173 &self.inner.state
174 }
175
176 #[instrument(skip_all, fields(room_id = %self.inner.room_id, thread_root = %self.inner.thread_id))]
178 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
179 self.handle_timeline(updates.timeline).await?;
180
181 Ok(())
182 }
183
184 #[instrument(skip_all, fields(room_id = %self.inner.room_id, thread_root = %self.inner.thread_id))]
186 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
187 self.handle_timeline(updates.timeline).await?;
188
189 Ok(())
190 }
191
192 async fn handle_timeline(&self, timeline: Timeline) -> Result<()> {
195 if timeline.events.is_empty() && timeline.prev_batch.is_none() {
196 return Ok(());
197 }
198
199 trace!("adding new events");
200
201 let mut state = self.inner.state.write().await?;
202
203 let (stored_prev_batch_token, timeline_event_diffs) = state.handle_sync(timeline).await?;
204
205 if stored_prev_batch_token {
208 self.inner.pagination_batch_token_notifier.notify_one();
209 }
210
211 if !timeline_event_diffs.is_empty() {
212 state.update_sender.send(
213 TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync },
214 None,
217 );
218 }
219
220 Ok(())
221 }
222
223 pub(super) async fn find_event(
228 &self,
229 event_id: &EventId,
230 ) -> Result<Option<(super::EventLocation, Event)>> {
231 self.inner.state.read().await?.find_event(event_id).await
232 }
233
234 pub async fn find_event_with_relations(
247 &self,
248 event_id: &EventId,
249 filter: Option<Vec<RelationType>>,
250 ) -> Result<Option<(Event, Vec<Event>)>> {
251 Ok(self
253 .inner
254 .state
255 .read()
256 .await?
257 .find_event_with_relations(event_id, filter)
258 .await
259 .ok()
260 .flatten())
261 }
262
263 #[cfg(feature = "e2e-encryption")]
269 pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<bool> {
270 let mut state = self.inner.state.write().await?;
271 let timeline_event_diffs = state.replace_utds(events).await?;
272
273 Ok(
274 if let Some(timeline_event_diffs) = timeline_event_diffs
275 && !timeline_event_diffs.is_empty()
276 {
277 state.update_sender.send(
278 TimelineVectorDiffs {
279 diffs: timeline_event_diffs,
280 origin: EventsOrigin::Cache,
281 },
282 Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
283 );
284
285 true
286 } else {
287 false
288 },
289 )
290 }
291}
292
293#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
295 use std::sync::Arc;
296
297 use assert_matches::assert_matches;
298 use assert_matches2::assert_let;
299 use eyeball_im::VectorDiff;
300 use futures_util::FutureExt as _;
301 use matrix_sdk_base::{
302 RoomState, ThreadingSupport,
303 cross_process_lock::CrossProcessLockConfig,
304 event_cache::{
305 Gap,
306 store::{EventCacheStore as _, MemoryStore},
307 },
308 linked_chunk::{
309 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
310 lazy_loader::from_all_chunks,
311 },
312 store::StoreConfig,
313 sync::{JoinedRoomUpdate, Timeline},
314 };
315 use matrix_sdk_test::{async_test, event_factory::EventFactory};
316 use ruma::{
317 event_id,
318 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
319 room_id, user_id,
320 };
321
322 use super::super::{super::RoomEventCacheGenericUpdate, TimelineVectorDiffs};
323 use crate::test_utils::client::MockClientBuilder;
324
325 #[async_test]
326 async fn test_write_to_storage() {
327 let room_id = room_id!("!r0");
328 let thread_root = event_id!("$t0_ev0");
329 let thread_event_id_0 = event_id!("$t0_ev1");
330
331 let f = EventFactory::new().room(room_id).sender(user_id!("@mnt_io:matrix.org"));
332
333 let event_cache_store = Arc::new(MemoryStore::new());
334
335 let client = MockClientBuilder::new(None)
336 .on_builder(|builder| {
337 builder
338 .store_config(
339 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
340 .event_cache_store(event_cache_store.clone()),
341 )
342 .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
343 })
344 .build()
345 .await;
346
347 let event_cache = client.event_cache();
348 event_cache.subscribe().unwrap();
349
350 client.base_client().get_or_create_room(room_id, RoomState::Joined);
351
352 let (thread_event_cache, _drop_handles) =
353 event_cache.thread(room_id, thread_root).await.unwrap();
354 let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
355
356 assert!(thread_events.is_empty());
357
358 let timeline = Timeline {
360 limited: true,
361 prev_batch: Some("raclette".to_owned()),
362 events: vec![
363 f.text_msg("salut")
364 .event_id(thread_event_id_0)
365 .in_thread(thread_root, thread_root)
366 .into_event(),
367 ],
368 };
369
370 thread_event_cache
371 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
372 .await
373 .unwrap();
374
375 assert_matches!(
376 thread_stream.recv().await,
377 Ok(TimelineVectorDiffs { diffs, .. }) => {
378 assert_eq!(diffs.len(), 2);
379 assert_matches!(&diffs[0], VectorDiff::Clear);
380 assert_matches!(&diffs[1], VectorDiff::Append { values: events } => {
381 assert_eq!(events.len(), 1);
382 assert_eq!(events[0].event_id(), Some(thread_event_id_0));
383 });
384 }
385 );
386 assert!(thread_stream.is_empty());
387
388 let linked_chunk = from_all_chunks::<3, _, _>(
390 event_cache_store
391 .load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
392 .await
393 .unwrap(),
394 )
395 .unwrap()
396 .unwrap();
397
398 assert_eq!(linked_chunk.chunks().count(), 2);
399
400 let mut chunks = linked_chunk.chunks();
401
402 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
404 assert_eq!(gap.token, "raclette");
405 });
406
407 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
409 assert_eq!(events.len(), 1);
410 assert_eq!(events[0].event_id(), Some(thread_event_id_0));
411 });
412
413 assert!(chunks.next().is_none());
415 }
416
417 #[async_test]
418 async fn test_write_to_storage_strips_bundled_relations() {
419 let sender = user_id!("@mnt_io:matrix.org");
420 let room_id = room_id!("!r0");
421 let thread_root = event_id!("$t0_ev0");
422 let thread_event_id_0 = event_id!("$t0_ev1");
423
424 let f = EventFactory::new().room(room_id).sender(sender);
425
426 let event_cache_store = Arc::new(MemoryStore::new());
427
428 let client = MockClientBuilder::new(None)
429 .on_builder(|builder| {
430 builder
431 .store_config(
432 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
433 .event_cache_store(event_cache_store.clone()),
434 )
435 .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
436 })
437 .build()
438 .await;
439
440 let event_cache = client.event_cache();
441 event_cache.subscribe().unwrap();
442
443 client.base_client().get_or_create_room(room_id, RoomState::Joined);
444
445 let (thread_event_cache, _drop_handles) =
446 event_cache.thread(room_id, thread_root).await.unwrap();
447
448 let timeline = Timeline {
450 limited: false,
451 prev_batch: None,
452 events: vec![
453 f.text_msg("s 'up")
454 .event_id(thread_event_id_0)
455 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(sender))
456 .in_thread(thread_root, thread_root)
457 .into_event(),
458 ],
459 };
460
461 thread_event_cache
462 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
463 .await
464 .unwrap();
465
466 {
468 let (events, _) = thread_event_cache.subscribe().await.unwrap();
469
470 assert_eq!(events.len(), 1);
471
472 let event = events[0].raw().deserialize().unwrap();
473 assert_eq!(event.event_id(), thread_event_id_0);
474 assert_let!(
475 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) =
476 event
477 );
478 assert!(msg.as_original().unwrap().unsigned.relations.replace.is_some());
479 }
480
481 let linked_chunk = from_all_chunks::<3, _, _>(
483 event_cache_store
484 .load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
485 .await
486 .unwrap(),
487 )
488 .unwrap()
489 .unwrap();
490
491 assert_eq!(linked_chunk.chunks().count(), 1);
492
493 let mut chunks = linked_chunk.chunks();
494 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
495 assert_eq!(events.len(), 1);
496
497 let event = events[0].raw().deserialize().unwrap();
498 assert_eq!(event.event_id(), thread_event_id_0);
499
500 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = event);
501 assert!(msg.as_original().unwrap().unsigned.relations.replace.is_none());
502 });
503
504 assert!(chunks.next().is_none());
506 }
507
508 #[async_test]
509 async fn test_clear() {
510 let room_id = room_id!("!r0");
511 let f = EventFactory::new().room(room_id).sender(user_id!("@mnt_io:matrix.org"));
512
513 let event_cache_store = Arc::new(MemoryStore::new());
514
515 let thread_root = event_id!("$t0_ev0");
516 let thread_event_id_0 = event_id!("$t0_ev1");
517 let thread_event_id_1 = event_id!("$t0_ev2");
518
519 let thread_event_0 = f
520 .text_msg("foo")
521 .event_id(thread_event_id_0)
522 .in_thread(thread_root, thread_root)
523 .into_event();
524 let thread_event_1 = f
525 .text_msg("bar")
526 .event_id(thread_event_id_1)
527 .in_thread(thread_root, thread_event_id_0)
528 .into_event();
529
530 event_cache_store
532 .handle_linked_chunk_updates(
533 LinkedChunkId::Thread(room_id, thread_root),
534 vec![
535 Update::NewItemsChunk {
537 previous: None,
538 new: ChunkIdentifier::new(0),
539 next: None,
540 },
541 Update::NewGapChunk {
543 previous: Some(ChunkIdentifier::new(0)),
544 new: ChunkIdentifier::new(42),
546 next: None,
547 gap: Gap { token: "comté".to_owned() },
548 },
549 Update::NewItemsChunk {
551 previous: Some(ChunkIdentifier::new(42)),
552 new: ChunkIdentifier::new(1),
553 next: None,
554 },
555 Update::PushItems {
556 at: Position::new(ChunkIdentifier::new(1), 0),
557 items: vec![thread_event_0.clone()],
558 },
559 Update::NewItemsChunk {
561 previous: Some(ChunkIdentifier::new(1)),
562 new: ChunkIdentifier::new(2),
563 next: None,
564 },
565 Update::PushItems {
566 at: Position::new(ChunkIdentifier::new(2), 0),
567 items: vec![thread_event_1.clone()],
568 },
569 ],
570 )
571 .await
572 .unwrap();
573
574 let client = MockClientBuilder::new(None)
575 .on_builder(|builder| {
576 builder
577 .store_config(
578 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
579 .event_cache_store(event_cache_store.clone()),
580 )
581 .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
582 })
583 .build()
584 .await;
585
586 let event_cache = client.event_cache();
587 event_cache.subscribe().unwrap();
588
589 client.base_client().get_or_create_room(room_id, RoomState::Joined);
590
591 let (thread_event_cache, _drop_handles) =
592 event_cache.thread(room_id, thread_root).await.unwrap();
593 let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
594
595 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
596
597 {
599 assert!(thread_event_cache.find_event(thread_event_id_0).await.unwrap().is_some());
600 assert!(thread_event_cache.find_event(thread_event_id_1).await.unwrap().is_some());
601 }
602
603 {
605 assert_eq!(thread_events.len(), 1);
608 assert_eq!(thread_events[0].event_id().unwrap(), thread_event_id_1);
609
610 assert!(thread_stream.is_empty());
611 }
612
613 {
615 thread_event_cache.pagination().run_backwards_once(20).await.unwrap();
616
617 assert_matches!(
618 thread_stream.recv().await,
619 Ok(TimelineVectorDiffs { diffs, .. }) => {
620 assert_eq!(diffs.len(), 1);
621 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
622 assert_eq!(event.event_id(), Some(thread_event_id_0));
624 });
625 }
626 );
627 assert!(thread_stream.is_empty());
628
629 assert_matches!(
630 generic_stream.recv().await,
631 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
632 assert_eq!(room_id, expected_room_id);
633 }
634 );
635 assert!(generic_stream.is_empty());
636 }
637
638 event_cache.clear_all_rooms().await.unwrap();
640
641 assert_matches!(
643 thread_stream.recv().await,
644 Ok(TimelineVectorDiffs { diffs, .. }) => {
645 assert_eq!(diffs.len(), 2);
646 assert_matches!(&diffs[0], VectorDiff::Clear);
647 assert_matches!(&diffs[1], VectorDiff::Append { values } => {
648 assert!(values.is_empty());
649 });
650 }
651 );
652
653 assert_matches!(
656 generic_stream.recv().await,
657 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) => {
658 assert_eq!(received_room_id, room_id);
659 }
660 );
661 assert_matches!(
663 generic_stream.recv().await,
664 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) => {
665 assert_eq!(received_room_id, room_id);
666 }
667 );
668 assert!(generic_stream.is_empty());
669
670 assert!(thread_event_cache.find_event(thread_event_id_0).await.unwrap().is_none());
673 assert!(thread_event_cache.find_event(thread_event_id_1).await.unwrap().is_none());
674
675 let (thread_events, _) = thread_event_cache.subscribe().await.unwrap();
677 assert!(thread_events.is_empty());
678
679 let linked_chunk = from_all_chunks::<3, _, _>(
681 event_cache_store
682 .load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
683 .await
684 .unwrap(),
685 )
686 .unwrap()
687 .unwrap();
688
689 assert_eq!(linked_chunk.num_items(), 0);
693 }
694
695 #[async_test]
696 async fn test_load_from_storage() {
697 let room_id = room_id!("!r0");
698 let f = EventFactory::new().room(room_id).sender(user_id!("@mnt_io:matrix.org"));
699
700 let event_cache_store = Arc::new(MemoryStore::new());
701
702 let thread_root = event_id!("$t0");
703 let thread_event_id_0 = event_id!("$t0_ev0");
704 let thread_event_id_1 = event_id!("$t0_ev1");
705
706 let thread_event_0 = f
707 .text_msg("hello world")
708 .event_id(thread_event_id_0)
709 .in_thread(thread_root, thread_root)
710 .into_event();
711 let thread_event_1 = f
712 .text_msg("how's it going")
713 .event_id(thread_event_id_1)
714 .in_thread(thread_root, thread_event_id_1)
715 .into_event();
716
717 let updates = vec![
721 Update::NewItemsChunk { previous: None, new: ChunkIdentifier::new(0), next: None },
723 Update::NewGapChunk {
725 previous: Some(ChunkIdentifier::new(0)),
726 new: ChunkIdentifier::new(42),
728 next: None,
729 gap: Gap { token: "gruyère".to_owned() },
730 },
731 Update::NewItemsChunk {
733 previous: Some(ChunkIdentifier::new(42)),
734 new: ChunkIdentifier::new(1),
735 next: None,
736 },
737 Update::PushItems {
738 at: Position::new(ChunkIdentifier::new(1), 0),
739 items: vec![thread_event_0.clone()],
740 },
741 Update::NewItemsChunk {
743 previous: Some(ChunkIdentifier::new(1)),
744 new: ChunkIdentifier::new(2),
745 next: None,
746 },
747 Update::PushItems {
748 at: Position::new(ChunkIdentifier::new(2), 0),
749 items: vec![thread_event_1.clone()],
750 },
751 ];
752 event_cache_store
753 .handle_linked_chunk_updates(LinkedChunkId::Room(room_id), updates.clone())
754 .await
755 .unwrap();
756 event_cache_store
757 .handle_linked_chunk_updates(LinkedChunkId::Thread(room_id, thread_root), updates)
758 .await
759 .unwrap();
760
761 let client = MockClientBuilder::new(None)
762 .on_builder(|builder| {
763 builder
764 .store_config(
765 StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
766 .event_cache_store(event_cache_store.clone()),
767 )
768 .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
769 })
770 .build()
771 .await;
772
773 let event_cache = client.event_cache();
774 event_cache.subscribe().unwrap();
775
776 client.base_client().get_or_create_room(room_id, RoomState::Joined);
777
778 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
780 let (thread_event_cache, _drop_handles) =
781 event_cache.thread(room_id, thread_root).await.unwrap();
782 let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
783
784 for _ in 0..2 {
787 assert_matches!(
788 generic_stream.recv().await,
789 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
790 assert_eq!(room_id, expected_room_id);
791 }
792 );
793 }
794 assert!(generic_stream.is_empty());
795
796 assert_eq!(thread_events.len(), 1);
799 assert_eq!(thread_events[0].event_id().unwrap(), thread_event_id_1);
800 assert!(thread_stream.is_empty());
801
802 assert!(thread_event_cache.find_event(thread_event_id_0).await.unwrap().is_some());
805 assert!(thread_event_cache.find_event(thread_event_id_1).await.unwrap().is_some());
806
807 thread_event_cache.pagination().run_backwards_once(20).await.unwrap();
809
810 assert_matches!(
811 thread_stream.recv().await,
812 Ok(TimelineVectorDiffs { diffs, .. }) => {
813 assert_eq!(diffs.len(), 1);
814 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
815 assert_eq!(event.event_id(), Some(thread_event_id_0));
816 });
817 }
818 );
819 assert!(thread_stream.is_empty());
820
821 assert_matches!(
823 generic_stream.recv().await,
824 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
825 assert_eq!(expected_room_id, room_id);
826 }
827 );
828 assert!(generic_stream.is_empty());
829
830 let timeline = Timeline { limited: false, prev_batch: None, events: vec![thread_event_1] };
832
833 thread_event_cache
834 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
835 .await
836 .unwrap();
837
838 assert!(generic_stream.recv().now_or_never().is_none());
841
842 let (thread_events, _) = thread_event_cache.subscribe().await.unwrap();
847 assert_eq!(thread_events.len(), 2);
848 assert_eq!(thread_events[0].event_id(), Some(thread_event_id_0));
849 assert_eq!(thread_events[1].event_id(), Some(thread_event_id_1));
850 }
851
852 #[async_test]
853 async fn test_load_from_storage_resilient_to_failure() {
854 let room_id = room_id!("!r0");
855 let f = EventFactory::new().room(room_id).sender(user_id!("@mnt_io:matrix.org"));
856
857 let event_cache_store = Arc::new(MemoryStore::new());
858
859 let thread_root = event_id!("$t0");
860 let thread_event_id_0 = event_id!("$t0_ev0");
861
862 let thread_event_0 = f
863 .text_msg("hello world")
864 .event_id(thread_event_id_0)
865 .in_thread(thread_root, thread_root)
866 .into_event();
867
868 event_cache_store
870 .handle_linked_chunk_updates(
871 LinkedChunkId::Thread(room_id, thread_root),
872 vec![
873 Update::NewItemsChunk {
874 previous: None,
875 new: ChunkIdentifier::new(0),
876 next: None,
877 },
878 Update::PushItems {
879 at: Position::new(ChunkIdentifier::new(0), 0),
880 items: vec![thread_event_0],
881 },
882 Update::NewItemsChunk {
883 previous: Some(ChunkIdentifier::new(0)),
884 new: ChunkIdentifier::new(1),
885 next: Some(ChunkIdentifier::new(0)),
886 },
887 ],
888 )
889 .await
890 .unwrap();
891
892 let client = MockClientBuilder::new(None)
893 .on_builder(|builder| {
894 builder
895 .store_config(
896 StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
897 .event_cache_store(event_cache_store.clone()),
898 )
899 .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
900 })
901 .build()
902 .await;
903
904 let event_cache = client.event_cache();
905 event_cache.subscribe().unwrap();
906
907 client.base_client().get_or_create_room(room_id, RoomState::Joined);
908
909 let (thread_event_cache, _drop_handles) =
910 event_cache.thread(room_id, thread_root).await.unwrap();
911 let (thread_events, _) = thread_event_cache.subscribe().await.unwrap();
912
913 assert!(thread_events.is_empty());
916
917 let raw_chunks = event_cache_store
920 .load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
921 .await
922 .unwrap();
923 assert!(raw_chunks.is_empty());
924 }
925
926 #[async_test]
927 async fn test_reload_when_dirty() {
928 let user_id = user_id!("@mnt_io:matrix.org");
929 let room_id = room_id!("!raclette:patate.ch");
930
931 let event_cache_store = MemoryStore::new();
933
934 let client_p0 = MockClientBuilder::new(None)
936 .on_builder(|builder| {
937 builder
938 .store_config(
939 StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
940 .event_cache_store(event_cache_store.clone()),
941 )
942 .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
943 })
944 .build()
945 .await;
946
947 let client_p1 = MockClientBuilder::new(None)
949 .on_builder(|builder| {
950 builder
951 .store_config(
952 StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
953 .event_cache_store(event_cache_store),
954 )
955 .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
956 })
957 .build()
958 .await;
959
960 let event_factory = EventFactory::new().room(room_id).sender(user_id);
961
962 let thread_root = event_id!("$t0");
963 let thread_event_id_0 = event_id!("$t0_ev0");
964 let thread_event_id_1 = event_id!("$t0_ev1");
965
966 let thread_event_0 = event_factory
967 .text_msg("comté")
968 .event_id(thread_event_id_0)
969 .in_thread(thread_root, thread_root)
970 .into_event();
971 let thread_event_1 = event_factory
972 .text_msg("morbier")
973 .event_id(thread_event_id_1)
974 .in_thread(thread_root, thread_event_id_0)
975 .into_event();
976
977 client_p0
979 .event_cache_store()
980 .lock()
981 .await
982 .expect("[p0] Could not acquire the event cache lock")
983 .as_clean()
984 .expect("[p0] Could not acquire a clean event cache lock")
985 .handle_linked_chunk_updates(
986 LinkedChunkId::Thread(room_id, thread_root),
987 vec![
988 Update::NewItemsChunk {
989 previous: None,
990 new: ChunkIdentifier::new(0),
991 next: None,
992 },
993 Update::PushItems {
994 at: Position::new(ChunkIdentifier::new(0), 0),
995 items: vec![thread_event_0],
996 },
997 Update::NewItemsChunk {
998 previous: Some(ChunkIdentifier::new(0)),
999 new: ChunkIdentifier::new(1),
1000 next: None,
1001 },
1002 Update::PushItems {
1003 at: Position::new(ChunkIdentifier::new(1), 0),
1004 items: vec![thread_event_1],
1005 },
1006 ],
1007 )
1008 .await
1009 .unwrap();
1010
1011 let (thread_event_cache_p0, thread_event_cache_p1) = {
1013 let event_cache_p0 = client_p0.event_cache();
1014 event_cache_p0.subscribe().unwrap();
1015
1016 let event_cache_p1 = client_p1.event_cache();
1017 event_cache_p1.subscribe().unwrap();
1018
1019 client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
1020 client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
1021
1022 let (thread_event_cache_p0, _drop_handles) =
1023 event_cache_p0.thread(room_id, thread_root).await.unwrap();
1024 let (thread_event_cache_p1, _drop_handles) =
1025 event_cache_p1.thread(room_id, thread_root).await.unwrap();
1026
1027 (thread_event_cache_p0, thread_event_cache_p1)
1028 };
1029
1030 let mut updates_stream_p0 = {
1035 let thread_event_cache = &thread_event_cache_p0;
1036
1037 let (initial_updates, mut updates_stream) =
1038 thread_event_cache_p0.subscribe().await.unwrap();
1039
1040 assert_eq!(initial_updates.len(), 1);
1042 assert_eq!(initial_updates[0].event_id(), Some(thread_event_id_1));
1043 assert!(updates_stream.is_empty());
1044
1045 thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
1047
1048 assert_matches!(
1050 updates_stream.recv().await.unwrap(),
1051 TimelineVectorDiffs { diffs, .. } => {
1052 assert_eq!(diffs.len(), 1, "{diffs:#?}");
1053 assert_matches!(
1054 &diffs[0],
1055 VectorDiff::Insert { index: 0, value: event } => {
1056 assert_eq!(event.event_id(), Some(thread_event_id_0));
1057 }
1058 );
1059 }
1060 );
1061
1062 updates_stream
1063 };
1064
1065 let mut updates_stream_p1 = {
1067 let thread_event_cache = &thread_event_cache_p1;
1068 let (initial_updates, mut updates_stream) =
1069 thread_event_cache_p1.subscribe().await.unwrap();
1070
1071 assert_eq!(initial_updates.len(), 1);
1073 assert_eq!(initial_updates[0].event_id(), Some(thread_event_id_1));
1074 assert!(updates_stream.is_empty());
1075
1076 thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
1078
1079 assert_matches!(
1081 updates_stream.recv().await.unwrap(),
1082 TimelineVectorDiffs { diffs, .. } => {
1083 assert_eq!(diffs.len(), 1, "{diffs:#?}");
1084 assert_matches!(
1085 &diffs[0],
1086 VectorDiff::Insert { index: 0, value: event } => {
1087 assert_eq!(event.event_id(), Some(thread_event_id_0));
1088 }
1089 );
1090 }
1091 );
1092
1093 updates_stream
1094 };
1095
1096 for _ in 0..3 {
1098 {
1102 let thread_event_cache = &thread_event_cache_p0;
1103 let updates_stream = &mut updates_stream_p0;
1104
1105 let (initial_updates, _) = thread_event_cache.subscribe().await.unwrap();
1109
1110 assert_eq!(initial_updates.len(), 1);
1111 assert_eq!(initial_updates[0].event_id(), Some(thread_event_id_1));
1112
1113 assert_matches!(
1115 updates_stream.recv().await.unwrap(),
1116 TimelineVectorDiffs { diffs, .. } => {
1117 assert_eq!(diffs.len(), 2, "{diffs:#?}");
1118 assert_matches!(&diffs[0], VectorDiff::Clear);
1119 assert_matches!(
1120 &diffs[1],
1121 VectorDiff::Append { values: events } => {
1122 assert_eq!(events.len(), 1);
1123 assert_eq!(events[0].event_id(), Some(thread_event_id_1));
1124 }
1125 );
1126 }
1127 );
1128
1129 thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
1131
1132 assert_matches!(
1135 updates_stream.recv().await.unwrap(),
1136 TimelineVectorDiffs { diffs, .. } => {
1137 assert_eq!(diffs.len(), 1, "{diffs:#?}");
1138 assert_matches!(
1139 &diffs[0],
1140 VectorDiff::Insert { index: 0, value: event } => {
1141 assert_eq!(event.event_id(), Some(thread_event_id_0));
1142 }
1143 );
1144 }
1145 );
1146 }
1147
1148 {
1152 let thread_event_cache = &thread_event_cache_p1;
1153 let updates_stream = &mut updates_stream_p1;
1154
1155 let (initial_updates, _) = thread_event_cache.subscribe().await.unwrap();
1159
1160 assert_eq!(initial_updates.len(), 1);
1161 assert_eq!(initial_updates[0].event_id(), Some(thread_event_id_1));
1162
1163 assert_matches!(
1165 updates_stream.recv().await.unwrap(),
1166 TimelineVectorDiffs { diffs, .. } => {
1167 assert_eq!(diffs.len(), 2, "{diffs:#?}");
1168 assert_matches!(&diffs[0], VectorDiff::Clear);
1169 assert_matches!(
1170 &diffs[1],
1171 VectorDiff::Append { values: events } => {
1172 assert_eq!(events.len(), 1);
1173 assert_eq!(events[0].event_id(), Some(thread_event_id_1));
1174 }
1175 );
1176 }
1177 );
1178
1179 thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
1181
1182 assert_matches!(
1185 updates_stream.recv().await.unwrap(),
1186 TimelineVectorDiffs { diffs, .. } => {
1187 assert_eq!(diffs.len(), 1, "{diffs:#?}");
1188 assert_matches!(
1189 &diffs[0],
1190 VectorDiff::Insert { index: 0, value: event } => {
1191 assert_eq!(event.event_id(), Some(thread_event_id_0));
1192 }
1193 );
1194 }
1195 );
1196 }
1197 }
1198 }
1199}