1use std::{
117 collections::{BTreeMap, BTreeSet},
118 pin::Pin,
119 sync::Weak,
120};
121
122use as_variant::as_variant;
123use futures_core::Stream;
124use futures_util::{StreamExt, future::join_all, pin_mut};
125#[cfg(doc)]
126use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
127use matrix_sdk_base::{
128 crypto::{
129 store::types::{RoomKeyInfo, RoomKeyWithheldInfo},
130 types::events::room::encrypted::EncryptedEvent,
131 },
132 deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind},
133 event_cache::store::EventCacheStoreLockState,
134 locks::Mutex,
135 task_monitor::BackgroundTaskHandle,
136 timer,
137};
138#[cfg(doc)]
139use matrix_sdk_common::deserialized_responses::EncryptionInfo;
140use ruma::{
141 OwnedEventId, OwnedRoomId, RoomId,
142 events::{AnySyncTimelineEvent, room::encrypted::OriginalSyncRoomEncryptedEvent},
143 push::Action,
144 serde::Raw,
145};
146use tokio::sync::{
147 broadcast::{self, Sender},
148 mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
149};
150use tokio_stream::wrappers::{
151 BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError,
152};
153use tracing::{info, instrument, trace, warn};
154
155#[cfg(doc)]
156use super::RoomEventCache;
157use super::{
158 EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheGenericUpdate,
159 RoomEventCacheUpdate, TimelineVectorDiffs,
160 caches::room::{PostProcessingOrigin, RoomEventCacheLinkedChunkUpdate},
161};
162use crate::{Client, Result, Room, encryption::backups::BackupState, room::PushContext};
163
164type SessionId<'a> = &'a str;
165type OwnedSessionId = String;
166
167type EventIdAndUtd = (OwnedEventId, Raw<AnySyncTimelineEvent>);
168type EventIdAndEvent = (OwnedEventId, DecryptedRoomEvent);
169pub(in crate::event_cache) type ResolvedUtd =
170 (OwnedEventId, DecryptedRoomEvent, Option<Vec<Action>>);
171
172#[derive(Debug, Clone)]
175pub struct DecryptionRetryRequest {
176 pub room_id: OwnedRoomId,
178 pub utd_session_ids: BTreeSet<OwnedSessionId>,
180 pub refresh_info_session_ids: BTreeSet<OwnedSessionId>,
183}
184
185#[derive(Debug, Clone)]
187pub enum RedecryptorReport {
188 ResolvedUtds {
190 room_id: OwnedRoomId,
192 events: BTreeSet<OwnedEventId>,
194 },
195 Lagging,
198 BackupAvailable,
203}
204
205pub(super) struct RedecryptorChannels {
206 utd_reporter: Sender<RedecryptorReport>,
207 pub(super) decryption_request_sender: UnboundedSender<DecryptionRetryRequest>,
208 pub(super) decryption_request_receiver:
209 Mutex<Option<UnboundedReceiver<DecryptionRetryRequest>>>,
210}
211
212impl RedecryptorChannels {
213 pub(super) fn new() -> Self {
214 let (utd_reporter, _) = broadcast::channel(100);
215 let (decryption_request_sender, decryption_request_receiver) = unbounded_channel();
216
217 Self {
218 utd_reporter,
219 decryption_request_sender,
220 decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)),
221 }
222 }
223}
224
225fn filter_timeline_event_to_utd(
230 event: TimelineEvent,
231) -> Option<(OwnedEventId, Raw<AnySyncTimelineEvent>)> {
232 let event_id = event.event_id();
233
234 let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event);
237 event_id.zip(event)
240}
241
242fn filter_timeline_event_to_decrypted(
248 event: TimelineEvent,
249) -> Option<(OwnedEventId, DecryptedRoomEvent)> {
250 let event_id = event.event_id();
251
252 let event = as_variant!(event.kind, TimelineEventKind::Decrypted(event) => event);
253 event_id.zip(event)
256}
257
258impl EventCache {
259 async fn get_utds(
267 &self,
268 room_id: &RoomId,
269 session_id: SessionId<'_>,
270 ) -> Result<Vec<EventIdAndUtd>, EventCacheError> {
271 let events = match self.inner.store.lock().await? {
272 EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
277 guard.get_room_events(room_id, Some("m.room.encrypted"), Some(session_id)).await?
278 }
279 };
280
281 Ok(events.into_iter().filter_map(filter_timeline_event_to_utd).collect())
282 }
283
284 async fn get_utds_from_memory(&self) -> BTreeMap<OwnedRoomId, Vec<EventIdAndUtd>> {
287 let mut utds = BTreeMap::new();
288
289 for (room_id, caches) in self.inner.by_room.read().await.iter() {
290 let room_utds: Vec<_> = caches
291 .all_events()
292 .await
293 .into_iter()
294 .flatten()
295 .filter_map(filter_timeline_event_to_utd)
296 .collect();
297
298 utds.insert(room_id.to_owned(), room_utds);
299 }
300
301 utds
302 }
303
304 async fn get_decrypted_events(
305 &self,
306 room_id: &RoomId,
307 session_id: SessionId<'_>,
308 ) -> Result<Vec<EventIdAndEvent>, EventCacheError> {
309 let events = match self.inner.store.lock().await? {
310 EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
315 guard.get_room_events(room_id, None, Some(session_id)).await?
316 }
317 };
318
319 Ok(events.into_iter().filter_map(filter_timeline_event_to_decrypted).collect())
320 }
321
322 async fn get_decrypted_events_from_memory(
323 &self,
324 ) -> BTreeMap<OwnedRoomId, Vec<EventIdAndEvent>> {
325 let mut decrypted_events = BTreeMap::new();
326
327 for (room_id, caches) in self.inner.by_room.read().await.iter() {
328 let room_utds: Vec<_> = caches
329 .all_events()
330 .await
331 .into_iter()
332 .flatten()
333 .filter_map(filter_timeline_event_to_decrypted)
334 .collect();
335
336 decrypted_events.insert(room_id.to_owned(), room_utds);
337 }
338
339 decrypted_events
340 }
341
342 #[instrument(skip_all, fields(room_id))]
354 async fn on_resolved_utds(
355 &self,
356 room_id: &RoomId,
357 events: Vec<ResolvedUtd>,
358 ) -> Result<(), EventCacheError> {
359 if events.is_empty() {
360 trace!("No events were redecrypted or updated, nothing to replace");
361 return Ok(());
362 }
363
364 timer!("Resolving UTDs");
365
366 let (room_cache, _drop_handles) = self.for_room(room_id).await?;
368
369 let event_ids: BTreeSet<_> =
370 events.iter().cloned().map(|(event_id, _, _)| event_id).collect();
371
372 let (pinned_cache, ef_caches) = {
379 let mut state = room_cache.state().write().await?;
380
381 let pinned_cache = state.pinned_event_cache().cloned();
382 let ef_caches: Vec<_> = state.event_focused_caches().cloned().collect();
383
384 let mut new_events = Vec::with_capacity(events.len());
386 for (event_id, decrypted, actions) in &events {
387 if let Some((location, mut target_event)) = state.find_event(event_id).await? {
388 target_event.kind = TimelineEventKind::Decrypted(decrypted.clone());
389
390 if let Some(actions) = actions {
391 target_event.set_push_actions(actions.clone());
392 }
393
394 state.replace_event_at(location, target_event.clone()).await?;
397 new_events.push(target_event);
398 }
399 }
400
401 let receipt_event = None;
409
410 state
411 .post_process_new_events(
412 new_events,
413 PostProcessingOrigin::Redecryption,
414 receipt_event,
415 )
416 .await?;
417
418 room_cache.update_sender().send(
419 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
420 diffs: state.room_linked_chunk_mut().updates_as_vector_diffs(),
421 origin: EventsOrigin::Cache,
422 }),
423 Some(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() }),
424 );
425
426 (pinned_cache, ef_caches)
427 };
428 if let Some(pinned_cache) = pinned_cache {
434 pinned_cache.replace_utds(&events).await?;
435 }
436
437 join_all(ef_caches.iter().map(|cache| cache.replace_utds(&events))).await;
442
443 let report =
444 RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids };
445 let _ = self.inner.redecryption_channels.utd_reporter.send(report);
446
447 Ok(())
448 }
449
450 async fn decrypt_event(
452 &self,
453 room_id: &RoomId,
454 room: Option<&Room>,
455 push_context: Option<&PushContext>,
456 event: &Raw<EncryptedEvent>,
457 ) -> Option<(DecryptedRoomEvent, Option<Vec<Action>>)> {
458 if let Some(room) = room {
459 match room
460 .decrypt_event(
461 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
462 push_context,
463 )
464 .await
465 {
466 Ok(maybe_decrypted) => {
467 let actions = maybe_decrypted.push_actions().map(|a| a.to_vec());
468
469 if let TimelineEventKind::Decrypted(decrypted) = maybe_decrypted.kind {
470 Some((decrypted, actions))
471 } else {
472 warn!(
473 "Failed to redecrypt an event despite receiving a room key or request to redecrypt"
474 );
475 None
476 }
477 }
478 Err(e) => {
479 warn!(
480 "Failed to redecrypt an event despite receiving a room key or request to redecrypt {e:?}"
481 );
482 None
483 }
484 }
485 } else {
486 let client = self.inner.client().ok()?;
487 let machine = client.olm_machine().await;
488 let machine = machine.as_ref()?;
489
490 match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await {
491 Ok(decrypted) => Some((decrypted, None)),
492 Err(e) => {
493 warn!(
494 "Failed to redecrypt an event despite receiving a room key or a request to redecrypt {e:?}"
495 );
496 None
497 }
498 }
499 }
500 }
501
502 #[instrument(skip_all, fields(room_id, session_id))]
505 async fn retry_decryption(
506 &self,
507 room_id: &RoomId,
508 session_id: SessionId<'_>,
509 ) -> Result<(), EventCacheError> {
510 let events = self.get_utds(room_id, session_id).await?;
512 self.retry_decryption_for_events(room_id, events).await
513 }
514
515 #[instrument(skip_all, fields(updates.linked_chunk_id))]
517 async fn retry_decryption_for_event_cache_updates(
518 &self,
519 updates: RoomEventCacheLinkedChunkUpdate,
520 ) -> Result<(), EventCacheError> {
521 let room_id = updates.linked_chunk_id.room_id();
522 let events: Vec<_> = updates
523 .updates
524 .into_iter()
525 .flat_map(|updates| updates.into_items())
526 .filter_map(filter_timeline_event_to_utd)
527 .collect();
528
529 self.retry_decryption_for_events(room_id, events).await
530 }
531
532 async fn retry_decryption_for_in_memory_events(&self) {
533 let utds = self.get_utds_from_memory().await;
534
535 for (room_id, utds) in utds.into_iter() {
536 if let Err(e) = self.retry_decryption_for_events(&room_id, utds).await {
537 warn!(%room_id, "Failed to redecrypt in-memory events {e:?}");
538 }
539 }
540 }
541
542 #[instrument(skip_all, fields(room_id, session_id))]
544 async fn retry_decryption_for_events(
545 &self,
546 room_id: &RoomId,
547 events: Vec<EventIdAndUtd>,
548 ) -> Result<(), EventCacheError> {
549 trace!("Retrying to decrypt");
550
551 if events.is_empty() {
552 trace!("No relevant events found.");
553 return Ok(());
554 }
555
556 let room = self.inner.client().ok().and_then(|client| client.get_room(room_id));
557 let push_context =
558 if let Some(room) = &room { room.push_context().await.ok().flatten() } else { None };
559
560 let mut decrypted_events = Vec::with_capacity(events.len());
562
563 for (event_id, event) in events {
564 if let Some((decrypted, actions)) = self
567 .decrypt_event(
568 room_id,
569 room.as_ref(),
570 push_context.as_ref(),
571 event.cast_ref_unchecked(),
572 )
573 .await
574 {
575 decrypted_events.push((event_id, decrypted, actions));
576 }
577 }
578
579 let event_ids: BTreeSet<_> =
580 decrypted_events.iter().map(|(event_id, _, _)| event_id).collect();
581
582 if !event_ids.is_empty() {
583 trace!(?event_ids, "Successfully redecrypted events");
584 }
585
586 self.on_resolved_utds(room_id, decrypted_events).await?;
589
590 Ok(())
591 }
592
593 async fn update_encryption_info_for_events(
595 &self,
596 room: &Room,
597 events: Vec<EventIdAndEvent>,
598 ) -> Result<(), EventCacheError> {
599 let mut updated_events = Vec::with_capacity(events.len());
601
602 for (event_id, mut event) in events {
603 if let Some(session_id) = event.encryption_info.session_id() {
604 let new_encryption_info =
605 room.get_encryption_info(session_id, &event.encryption_info.sender).await;
606
607 if let Some(new_encryption_info) = new_encryption_info
609 && event.encryption_info != new_encryption_info
610 {
611 event.encryption_info = new_encryption_info;
612 updated_events.push((event_id, event, None));
613 }
614 }
615 }
616
617 let event_ids: BTreeSet<_> =
618 updated_events.iter().map(|(event_id, _, _)| event_id).collect();
619
620 if !event_ids.is_empty() {
621 trace!(?event_ids, "Replacing the encryption info of some events");
622 }
623
624 self.on_resolved_utds(room.room_id(), updated_events).await
625 }
626
627 #[instrument(skip_all, fields(room_id, session_id))]
628 async fn update_encryption_info(
629 &self,
630 room_id: &RoomId,
631 session_id: SessionId<'_>,
632 ) -> Result<(), EventCacheError> {
633 trace!("Updating encryption info");
634
635 let Ok(client) = self.inner.client() else {
636 return Ok(());
637 };
638
639 let Some(room) = client.get_room(room_id) else {
640 return Ok(());
641 };
642
643 let events = self.get_decrypted_events(room_id, session_id).await?;
645
646 if events.is_empty() {
647 trace!("No relevant events found.");
648 return Ok(());
649 }
650
651 self.update_encryption_info_for_events(&room, events).await
653 }
654
655 async fn retry_update_encryption_info_for_in_memory_events(&self) {
656 let decrypted_events = self.get_decrypted_events_from_memory().await;
657
658 for (room_id, events) in decrypted_events.into_iter() {
659 let Some(room) = self.inner.client().ok().and_then(|c| c.get_room(&room_id)) else {
660 continue;
661 };
662
663 if let Err(e) = self.update_encryption_info_for_events(&room, events).await {
664 warn!(
665 %room_id,
666 "Failed to replace the encryption info for in-memory events {e:?}"
667 );
668 }
669 }
670 }
671
672 async fn retry_in_memory_events(&self) {
683 self.retry_decryption_for_in_memory_events().await;
684 self.retry_update_encryption_info_for_in_memory_events().await;
685 }
686
687 pub fn request_decryption(&self, request: DecryptionRetryRequest) {
728 let _ =
729 self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err(
730 |_| warn!("Requesting a decryption while the redecryption task has been shut down"),
731 );
732 }
733
734 pub fn subscribe_to_decryption_reports(
785 &self,
786 ) -> impl Stream<Item = Result<RedecryptorReport, BroadcastStreamRecvError>> {
787 BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe())
788 }
789}
790
791#[inline(always)]
792fn upgrade_event_cache(cache: &Weak<EventCacheInner>) -> Option<EventCache> {
793 cache.upgrade().map(|inner| EventCache { inner })
794}
795
796async fn send_report_and_retry_memory_events(
797 cache: &Weak<EventCacheInner>,
798 report: RedecryptorReport,
799) -> Result<(), ()> {
800 let Some(cache) = upgrade_event_cache(cache) else {
801 return Err(());
802 };
803
804 cache.retry_in_memory_events().await;
805 let _ = cache.inner.redecryption_channels.utd_reporter.send(report);
806
807 Ok(())
808}
809
810pub(crate) struct Redecryptor {
817 _task: BackgroundTaskHandle,
818}
819
820impl Redecryptor {
821 pub(super) fn new(
826 client: &Client,
827 cache: Weak<EventCacheInner>,
828 receiver: UnboundedReceiver<DecryptionRetryRequest>,
829 linked_chunk_update_sender: &Sender<RoomEventCacheLinkedChunkUpdate>,
830 ) -> Self {
831 let linked_chunk_stream = BroadcastStream::new(linked_chunk_update_sender.subscribe());
832 let backup_state_stream = client.encryption().backups().state_stream();
833
834 let task = client
835 .task_monitor()
836 .spawn_infinite_task("event_cache::redecryptor", async {
837 let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
838
839 Self::listen_for_room_keys_task(
840 cache,
841 request_redecryption_stream,
842 linked_chunk_stream,
843 backup_state_stream,
844 )
845 .await;
846 })
847 .abort_on_drop();
848
849 Self { _task: task }
850 }
851
852 async fn subscribe_to_room_key_stream(
857 cache: &Weak<EventCacheInner>,
858 ) -> Option<(
859 impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>,
860 impl Stream<Item = Vec<RoomKeyWithheldInfo>>,
861 )> {
862 let event_cache = cache.upgrade()?;
863 let client = event_cache.client().ok()?;
864 let machine = client.olm_machine().await;
865
866 machine.as_ref().map(|m| {
867 (m.store().room_keys_received_stream(), m.store().room_keys_withheld_received_stream())
868 })
869 }
870
871 async fn redecryption_loop(
872 cache: &Weak<EventCacheInner>,
873 decryption_request_stream: &mut Pin<&mut impl Stream<Item = DecryptionRetryRequest>>,
874 events_stream: &mut Pin<
875 &mut impl Stream<Item = Result<RoomEventCacheLinkedChunkUpdate, BroadcastStreamRecvError>>,
876 >,
877 backup_state_stream: &mut Pin<
878 &mut impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
879 >,
880 ) -> bool {
881 let Some((room_key_stream, withheld_stream)) =
882 Self::subscribe_to_room_key_stream(cache).await
883 else {
884 return false;
885 };
886
887 pin_mut!(room_key_stream);
888 pin_mut!(withheld_stream);
889
890 loop {
891 tokio::select! {
892 Some(request) = decryption_request_stream.next() => {
895 let Some(cache) = upgrade_event_cache(cache) else {
896 break false;
897 };
898
899 trace!(?request, "Received a redecryption request");
900
901 for session_id in request.utd_session_ids {
902 let _ = cache
903 .retry_decryption(&request.room_id, &session_id)
904 .await
905 .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}"));
906 }
907
908 for session_id in request.refresh_info_session_ids {
909 let _ = cache.update_encryption_info(&request.room_id, &session_id).await.inspect_err(|e|
910 warn!(
911 room_id = %request.room_id,
912 session_id = session_id,
913 "Unable to update the encryption info {e:?}",
914 ));
915 }
916 }
917 room_keys = room_key_stream.next() => {
920 match room_keys {
921 Some(Ok(room_keys)) => {
922 let Some(cache) = upgrade_event_cache(cache) else {
926 break false;
927 };
928
929 trace!(?room_keys, "Received new room keys");
930
931 for key in &room_keys {
932 let _ = cache
933 .retry_decryption(&key.room_id, &key.session_id)
934 .await
935 .inspect_err(|e| warn!("Error redecrypting {e:?}"));
936 }
937
938 for key in room_keys {
939 let _ = cache.update_encryption_info(&key.room_id, &key.session_id).await.inspect_err(|e|
940 warn!(
941 room_id = %key.room_id,
942 session_id = key.session_id,
943 "Unable to update the encryption info {e:?}",
944 ));
945 }
946 },
947 Some(Err(_)) => {
948 warn!("The room key stream lagged, reporting the lag to our listeners");
955
956 if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
957 break false;
958 }
959 },
960 None => {
963 break true;
964 }
965 }
966 }
967 withheld_info = withheld_stream.next() => {
968 match withheld_info {
969 Some(infos) => {
970 let Some(cache) = upgrade_event_cache(cache) else {
971 break false;
972 };
973
974 trace!(?infos, "Received new withheld infos");
975
976 for RoomKeyWithheldInfo { room_id, session_id, .. } in &infos {
977 let _ = cache.update_encryption_info(room_id, session_id).await.inspect_err(|e|
978 warn!(
979 room_id = %room_id,
980 session_id = session_id,
981 "Unable to update the encryption info {e:?}",
982 ));
983 }
984 }
985 None => break true,
988 }
989 }
990 Some(event_updates) = events_stream.next() => {
994 match event_updates {
995 Ok(updates) => {
996 let Some(cache) = upgrade_event_cache(cache) else {
997 break false;
998 };
999
1000 let linked_chunk_id = updates.linked_chunk_id.to_owned();
1001
1002 let _ = cache.retry_decryption_for_event_cache_updates(updates).await.inspect_err(|e|
1003 warn!(
1004 %linked_chunk_id,
1005 "Unable to handle UTDs from event cache updates {e:?}",
1006 )
1007 );
1008 }
1009 Err(_) => {
1010 if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1011 break false;
1012 }
1013 }
1014 }
1015 }
1016 Some(backup_state_update) = backup_state_stream.next() => {
1017 match backup_state_update {
1018 Ok(state) => {
1019 match state {
1020 BackupState::Unknown |
1021 BackupState::Creating |
1022 BackupState::Enabling |
1023 BackupState::Resuming |
1024 BackupState::Downloading |
1025 BackupState::Disabling =>{
1026 }
1029 BackupState::Enabled => {
1030 if send_report_and_retry_memory_events(cache, RedecryptorReport::BackupAvailable).await.is_err() {
1035 break false;
1036 }
1037 }
1038 }
1039 }
1040 Err(_) => {
1041 if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1042 break false;
1043 }
1044 }
1045 }
1046 }
1047 else => break false,
1048 }
1049 }
1050 }
1051
1052 async fn listen_for_room_keys_task(
1053 cache: Weak<EventCacheInner>,
1054 decryption_request_stream: UnboundedReceiverStream<DecryptionRetryRequest>,
1055 events_stream: BroadcastStream<RoomEventCacheLinkedChunkUpdate>,
1056 backup_state_stream: impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
1057 ) {
1058 pin_mut!(decryption_request_stream);
1062 pin_mut!(events_stream);
1063 pin_mut!(backup_state_stream);
1064
1065 while Self::redecryption_loop(
1066 &cache,
1067 &mut decryption_request_stream,
1068 &mut events_stream,
1069 &mut backup_state_stream,
1070 )
1071 .await
1072 {
1073 info!("Regenerating the re-decryption streams");
1074
1075 if send_report_and_retry_memory_events(&cache, RedecryptorReport::Lagging)
1078 .await
1079 .is_err()
1080 {
1081 break;
1082 }
1083 }
1084
1085 info!("Shutting down the event cache redecryptor");
1086 }
1087}
1088
1089#[cfg(not(target_family = "wasm"))]
1090#[cfg(test)]
1091mod tests {
1092 use std::{
1093 collections::BTreeSet,
1094 sync::{
1095 Arc,
1096 atomic::{AtomicBool, Ordering},
1097 },
1098 time::Duration,
1099 };
1100
1101 use assert_matches2::assert_matches;
1102 use async_trait::async_trait;
1103 use eyeball_im::VectorDiff;
1104 use matrix_sdk_base::{
1105 cross_process_lock::CrossProcessLockGeneration,
1106 crypto::types::events::{ToDeviceEvent, room::encrypted::ToDeviceEncryptedEventContent},
1107 deserialized_responses::{TimelineEventKind, VerificationState},
1108 event_cache::{
1109 Event, Gap,
1110 store::{EventCacheStore, EventCacheStoreError, MemoryStore},
1111 },
1112 linked_chunk::{
1113 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
1114 RawChunk, Update,
1115 },
1116 locks::Mutex,
1117 sleep::sleep,
1118 store::StoreConfig,
1119 timeout::timeout,
1120 };
1121 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
1122 use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
1123 use ruma::{
1124 EventId, OwnedEventId, RoomId, RoomVersionId, device_id, event_id,
1125 events::{AnySyncTimelineEvent, relation::RelationType},
1126 room_id,
1127 serde::Raw,
1128 user_id,
1129 };
1130 use serde_json::json;
1131 use tokio::sync::oneshot::{self, Sender};
1132 use tracing::{Instrument, info};
1133
1134 use crate::{
1135 Client, assert_let_timeout,
1136 encryption::EncryptionSettings,
1137 event_cache::{
1138 DecryptionRetryRequest, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
1139 TimelineVectorDiffs,
1140 },
1141 test_utils::mocks::MatrixMockServer,
1142 };
1143
1144 #[derive(Debug, Clone)]
1149 struct DelayingStore {
1150 memory_store: MemoryStore,
1151 delaying: Arc<AtomicBool>,
1152 foo: Arc<Mutex<Option<Sender<()>>>>,
1153 }
1154
1155 impl DelayingStore {
1156 fn new() -> Self {
1157 Self {
1158 memory_store: MemoryStore::new(),
1159 delaying: AtomicBool::new(true).into(),
1160 foo: Arc::new(Mutex::new(None)),
1161 }
1162 }
1163
1164 async fn stop_delaying(&self) {
1165 let (sender, receiver) = oneshot::channel();
1166
1167 {
1168 *self.foo.lock() = Some(sender);
1169 }
1170
1171 self.delaying.store(false, Ordering::SeqCst);
1172
1173 receiver.await.expect("We should be able to receive a response")
1174 }
1175 }
1176
1177 #[cfg_attr(target_family = "wasm", async_trait(?Send))]
1178 #[cfg_attr(not(target_family = "wasm"), async_trait)]
1179 impl EventCacheStore for DelayingStore {
1180 type Error = EventCacheStoreError;
1181
1182 async fn try_take_leased_lock(
1183 &self,
1184 lease_duration_ms: u32,
1185 key: &str,
1186 holder: &str,
1187 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
1188 self.memory_store.try_take_leased_lock(lease_duration_ms, key, holder).await
1189 }
1190
1191 async fn handle_linked_chunk_updates(
1192 &self,
1193 linked_chunk_id: LinkedChunkId<'_>,
1194 updates: Vec<Update<Event, Gap>>,
1195 ) -> Result<(), Self::Error> {
1196 while self.delaying.load(Ordering::SeqCst) {
1202 sleep(Duration::from_millis(10)).await;
1203 }
1204
1205 let sender = self.foo.lock().take();
1206 let ret = self.memory_store.handle_linked_chunk_updates(linked_chunk_id, updates).await;
1207
1208 if let Some(sender) = sender {
1209 sender.send(()).expect("We should be able to notify the other side that we're done with the storage operation");
1210 }
1211
1212 ret
1213 }
1214
1215 async fn load_all_chunks(
1216 &self,
1217 linked_chunk_id: LinkedChunkId<'_>,
1218 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
1219 self.memory_store.load_all_chunks(linked_chunk_id).await
1220 }
1221
1222 async fn load_all_chunks_metadata(
1223 &self,
1224 linked_chunk_id: LinkedChunkId<'_>,
1225 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
1226 self.memory_store.load_all_chunks_metadata(linked_chunk_id).await
1227 }
1228
1229 async fn load_last_chunk(
1230 &self,
1231 linked_chunk_id: LinkedChunkId<'_>,
1232 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1233 self.memory_store.load_last_chunk(linked_chunk_id).await
1234 }
1235
1236 async fn load_previous_chunk(
1237 &self,
1238 linked_chunk_id: LinkedChunkId<'_>,
1239 before_chunk_identifier: ChunkIdentifier,
1240 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1241 self.memory_store.load_previous_chunk(linked_chunk_id, before_chunk_identifier).await
1242 }
1243
1244 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1245 self.memory_store.clear_all_linked_chunks().await
1246 }
1247
1248 async fn filter_duplicated_events(
1249 &self,
1250 linked_chunk_id: LinkedChunkId<'_>,
1251 events: Vec<OwnedEventId>,
1252 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1253 self.memory_store.filter_duplicated_events(linked_chunk_id, events).await
1254 }
1255
1256 async fn find_event(
1257 &self,
1258 room_id: &RoomId,
1259 event_id: &EventId,
1260 ) -> Result<Option<Event>, Self::Error> {
1261 self.memory_store.find_event(room_id, event_id).await
1262 }
1263
1264 async fn find_event_relations(
1265 &self,
1266 room_id: &RoomId,
1267 event_id: &EventId,
1268 filters: Option<&[RelationType]>,
1269 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1270 self.memory_store.find_event_relations(room_id, event_id, filters).await
1271 }
1272
1273 async fn get_room_events(
1274 &self,
1275 room_id: &RoomId,
1276 event_type: Option<&str>,
1277 session_id: Option<&str>,
1278 ) -> Result<Vec<Event>, Self::Error> {
1279 self.memory_store.get_room_events(room_id, event_type, session_id).await
1280 }
1281
1282 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1283 self.memory_store.save_event(room_id, event).await
1284 }
1285
1286 async fn optimize(&self) -> Result<(), Self::Error> {
1287 self.memory_store.optimize().await
1288 }
1289
1290 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1291 self.memory_store.get_size().await
1292 }
1293 }
1294
1295 async fn set_up_clients(
1296 room_id: &RoomId,
1297 alice_enables_cross_signing: bool,
1298 use_delayed_store: bool,
1299 ) -> (Client, Client, MatrixMockServer, Option<DelayingStore>) {
1300 let alice_span = tracing::info_span!("alice");
1301 let bob_span = tracing::info_span!("bob");
1302
1303 let alice_user_id = user_id!("@alice:localhost");
1304 let alice_device_id = device_id!("ALICEDEVICE");
1305 let bob_user_id = user_id!("@bob:localhost");
1306 let bob_device_id = device_id!("BOBDEVICE");
1307
1308 let matrix_mock_server = MatrixMockServer::new().await;
1309 matrix_mock_server.mock_crypto_endpoints_preset().await;
1310
1311 let encryption_settings = EncryptionSettings {
1312 auto_enable_cross_signing: alice_enables_cross_signing,
1313 ..Default::default()
1314 };
1315
1316 let alice = matrix_mock_server
1319 .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id)
1320 .on_builder(|builder| {
1321 builder
1322 .with_enable_share_history_on_invite(true)
1323 .with_encryption_settings(encryption_settings)
1324 })
1325 .build()
1326 .instrument(alice_span.clone())
1327 .await;
1328
1329 let encryption_settings =
1330 EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() };
1331
1332 let (store_config, store) = if use_delayed_store {
1333 let store = DelayingStore::new();
1334
1335 (
1336 StoreConfig::new(CrossProcessLockConfig::multi_process(
1337 "delayed_store_event_cache_test",
1338 ))
1339 .event_cache_store(store.clone()),
1340 Some(store),
1341 )
1342 } else {
1343 (
1344 StoreConfig::new(CrossProcessLockConfig::multi_process(
1345 "normal_store_event_cache_test",
1346 )),
1347 None,
1348 )
1349 };
1350
1351 let bob = matrix_mock_server
1352 .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id)
1353 .on_builder(|builder| {
1354 builder
1355 .with_enable_share_history_on_invite(true)
1356 .with_encryption_settings(encryption_settings)
1357 .store_config(store_config)
1358 })
1359 .build()
1360 .instrument(bob_span.clone())
1361 .await;
1362
1363 bob.event_cache().subscribe().expect("Bob should be able to enable the event cache");
1364
1365 matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await;
1367
1368 let event_factory = EventFactory::new().room(room_id).sender(alice_user_id);
1369
1370 let room_builder = JoinedRoomBuilder::new(room_id)
1372 .add_state_event(event_factory.create(alice_user_id, RoomVersionId::V1))
1373 .add_state_event(event_factory.room_encryption());
1374
1375 matrix_mock_server
1376 .mock_sync()
1377 .ok_and_run(&alice, |builder| {
1378 builder.add_joined_room(room_builder.clone());
1379 })
1380 .instrument(alice_span)
1381 .await;
1382
1383 matrix_mock_server
1384 .mock_sync()
1385 .ok_and_run(&bob, |builder| {
1386 builder.add_joined_room(room_builder);
1387 })
1388 .instrument(bob_span)
1389 .await;
1390
1391 (alice, bob, matrix_mock_server, store)
1392 }
1393
1394 async fn prepare_room(
1395 matrix_mock_server: &MatrixMockServer,
1396 event_factory: &EventFactory,
1397 alice: &Client,
1398 bob: &Client,
1399 room_id: &RoomId,
1400 ) -> (Raw<AnySyncTimelineEvent>, Raw<ToDeviceEvent<ToDeviceEncryptedEventContent>>) {
1401 let alice_user_id = alice.user_id().unwrap();
1402 let bob_user_id = bob.user_id().unwrap();
1403
1404 let alice_member_event = event_factory.member(alice_user_id).into_raw();
1405 let bob_member_event = event_factory.member(bob_user_id).into_raw();
1406
1407 let room = alice
1408 .get_room(room_id)
1409 .expect("Alice should have access to the room now that we synced");
1410
1411 let event_type = "m.room.message";
1416 let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"});
1417
1418 let event_id = event_id!("$some_id");
1419 let (event_receiver, mock) =
1420 matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id);
1421 let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await;
1422
1423 {
1424 let _guard = mock.mock_once().mount_as_scoped().await;
1425
1426 matrix_mock_server
1427 .mock_get_members()
1428 .ok(vec![alice_member_event.clone(), bob_member_event.clone()])
1429 .mock_once()
1430 .mount()
1431 .await;
1432
1433 room.send_raw(event_type, content)
1434 .await
1435 .expect("We should be able to send an initial message");
1436 };
1437
1438 let event = event_receiver.await.expect("Alice should have sent the event by now");
1440 let room_key = room_key.await;
1441
1442 (event, room_key)
1443 }
1444
1445 #[async_test]
1446 async fn test_redecryptor() {
1447 let room_id = room_id!("!test:localhost");
1448
1449 let event_factory = EventFactory::new().room(room_id);
1450 let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, true, false).await;
1451
1452 let (event, room_key) =
1453 prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1454
1455 let event_cache = bob.event_cache();
1458 let (room_cache, _) = event_cache
1459 .for_room(room_id)
1460 .await
1461 .expect("We should be able to get to the event cache for a specific room");
1462
1463 let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1464 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1465
1466 bob.inner
1469 .base_client
1470 .regenerate_olm(None)
1471 .await
1472 .expect("We should be able to regenerate the Olm machine");
1473
1474 matrix_mock_server
1476 .mock_sync()
1477 .ok_and_run(&bob, |builder| {
1478 builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1479 })
1480 .await;
1481
1482 assert_let_timeout!(
1485 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1486 subscriber.recv()
1487 );
1488
1489 assert_eq!(diffs.len(), 1);
1492 assert_matches!(&diffs[0], VectorDiff::Append { values });
1493 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1494
1495 assert_let_timeout!(
1496 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1497 );
1498 assert_eq!(expected_room_id, room_id);
1499 assert!(generic_stream.is_empty());
1500
1501 matrix_mock_server
1503 .mock_sync()
1504 .ok_and_run(&bob, |builder| {
1505 builder.add_to_device_event(
1506 room_key
1507 .deserialize_as()
1508 .expect("We should be able to deserialize the room key"),
1509 );
1510 })
1511 .await;
1512
1513 assert_let_timeout!(
1515 Duration::from_secs(1),
1516 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1517 subscriber.recv()
1518 );
1519
1520 assert_eq!(diffs.len(), 1);
1522 assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1523 assert_eq!(*index, 0);
1524 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1525
1526 assert_let_timeout!(
1527 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1528 );
1529 assert_eq!(expected_room_id, room_id);
1530 assert!(generic_stream.is_empty());
1531 }
1532
1533 #[async_test]
1534 async fn test_redecryptor_updating_encryption_info() {
1535 let bob_span = tracing::info_span!("bob");
1536
1537 let room_id = room_id!("!test:localhost");
1538
1539 let event_factory = EventFactory::new().room(room_id);
1540 let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, false, false).await;
1541
1542 let (event, room_key) =
1543 prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1544
1545 let event_cache = bob.event_cache();
1548 let (room_cache, _) = event_cache
1549 .for_room(room_id)
1550 .instrument(bob_span.clone())
1551 .await
1552 .expect("We should be able to get to the event cache for a specific room");
1553
1554 let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1555 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1556
1557 matrix_mock_server
1559 .mock_sync()
1560 .ok_and_run(&bob, |builder| {
1561 builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1562 })
1563 .instrument(bob_span.clone())
1564 .await;
1565
1566 assert_let_timeout!(
1569 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1570 subscriber.recv()
1571 );
1572
1573 assert_eq!(diffs.len(), 1);
1576 assert_matches!(&diffs[0], VectorDiff::Append { values });
1577 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1578
1579 assert_let_timeout!(
1580 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1581 );
1582 assert_eq!(expected_room_id, room_id);
1583 assert!(generic_stream.is_empty());
1584
1585 matrix_mock_server
1587 .mock_sync()
1588 .ok_and_run(&bob, |builder| {
1589 builder.add_to_device_event(
1590 room_key
1591 .deserialize_as()
1592 .expect("We should be able to deserialize the room key"),
1593 );
1594 })
1595 .instrument(bob_span.clone())
1596 .await;
1597
1598 assert_let_timeout!(
1600 Duration::from_secs(1),
1601 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1602 subscriber.recv()
1603 );
1604
1605 assert_eq!(diffs.len(), 1);
1607 assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1608 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1609
1610 let encryption_info = value.encryption_info().unwrap();
1611 assert_matches!(&encryption_info.verification_state, VerificationState::Unverified(_));
1612
1613 assert_let_timeout!(
1614 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1615 );
1616 assert_eq!(expected_room_id, room_id);
1617 assert!(generic_stream.is_empty());
1618
1619 let session_id = encryption_info.session_id().unwrap().to_owned();
1620 let alice_user_id = alice.user_id().unwrap();
1621
1622 alice
1624 .encryption()
1625 .bootstrap_cross_signing(None)
1626 .await
1627 .expect("Alice should be able to create the cross-signing keys");
1628
1629 bob.update_tracked_users_for_testing([alice_user_id]).instrument(bob_span.clone()).await;
1630 matrix_mock_server
1631 .mock_sync()
1632 .ok_and_run(&bob, |builder| {
1633 builder.add_change_device(alice_user_id);
1634 })
1635 .instrument(bob_span.clone())
1636 .await;
1637
1638 bob.event_cache().request_decryption(DecryptionRetryRequest {
1639 room_id: room_id.into(),
1640 utd_session_ids: BTreeSet::new(),
1641 refresh_info_session_ids: BTreeSet::from([session_id]),
1642 });
1643
1644 assert_let_timeout!(
1647 Duration::from_secs(1),
1648 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1649 subscriber.recv()
1650 );
1651
1652 assert_eq!(diffs.len(), 1);
1653 assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1654 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1655 let encryption_info = value.encryption_info().unwrap();
1656
1657 assert_matches!(
1658 &encryption_info.verification_state,
1659 VerificationState::Unverified(_),
1660 "The event should now know about the identity but still be unverified"
1661 );
1662
1663 assert_let_timeout!(
1664 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1665 );
1666 assert_eq!(expected_room_id, room_id);
1667 assert!(generic_stream.is_empty());
1668 }
1669
1670 #[async_test]
1671 async fn test_event_is_redecrypted_even_if_key_arrives_while_event_processing() {
1672 let room_id = room_id!("!test:localhost");
1673
1674 let event_factory = EventFactory::new().room(room_id);
1675 let (alice, bob, matrix_mock_server, delayed_store) =
1676 set_up_clients(room_id, true, true).await;
1677
1678 let delayed_store = delayed_store.unwrap();
1679
1680 let (event, room_key) =
1681 prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1682
1683 let event_cache = bob.event_cache();
1684
1685 let (room_cache, _) = event_cache
1687 .for_room(room_id)
1688 .await
1689 .expect("We should be able to get to the event cache for a specific room");
1690
1691 let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1692 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1693
1694 matrix_mock_server
1696 .mock_sync()
1697 .ok_and_run(&bob, |builder| {
1698 builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1699 })
1700 .await;
1701
1702 matrix_mock_server
1704 .mock_sync()
1705 .ok_and_run(&bob, |builder| {
1706 builder.add_to_device_event(
1707 room_key
1708 .deserialize_as()
1709 .expect("We should be able to deserialize the room key"),
1710 );
1711 })
1712 .await;
1713
1714 info!("Stopping the delay");
1715 delayed_store.stop_delaying().await;
1716
1717 assert_let_timeout!(
1724 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1725 subscriber.recv()
1726 );
1727
1728 assert_eq!(diffs.len(), 1);
1731 assert_matches!(&diffs[0], VectorDiff::Append { values });
1732 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1733
1734 assert_let_timeout!(
1735 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1736 );
1737 assert_eq!(expected_room_id, room_id);
1738
1739 assert_let_timeout!(
1741 Duration::from_secs(1),
1742 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1743 subscriber.recv()
1744 );
1745
1746 assert_eq!(diffs.len(), 1);
1748 assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1749 assert_eq!(*index, 0);
1750 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1751
1752 assert_let_timeout!(
1753 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1754 );
1755 assert_eq!(expected_room_id, room_id);
1756 assert!(generic_stream.is_empty());
1757 }
1758
1759 #[async_test]
1763 async fn test_redecryptor_no_deadlock_with_event_focused_cache_pagination() {
1764 use crate::{
1765 event_cache::EventFocusThreadMode,
1766 test_utils::mocks::{RoomContextResponseTemplate, RoomMessagesResponseTemplate},
1767 };
1768
1769 let room_id = room_id!("!test:localhost");
1770 let f = EventFactory::new().room(room_id);
1771 let (alice, bob, server, _) = set_up_clients(room_id, true, false).await;
1772
1773 let (encrypted_event, room_key) = prepare_room(&server, &f, &alice, &bob, room_id).await;
1774
1775 let event_cache = bob.event_cache();
1776 let (room_cache, _drop_handles) = event_cache
1777 .for_room(room_id)
1778 .await
1779 .expect("Bob should have an event cache for the room");
1780
1781 let (_initial_events, mut subscriber) = room_cache.subscribe().await.unwrap();
1782
1783 server
1785 .mock_sync()
1786 .ok_and_run(&bob, |builder| {
1787 builder.add_joined_room(
1788 JoinedRoomBuilder::new(room_id).add_timeline_event(encrypted_event),
1789 );
1790 })
1791 .await;
1792
1793 assert_let_timeout!(
1795 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1796 subscriber.recv()
1797 );
1798 assert_eq!(diffs.len(), 1);
1799 assert_matches!(&diffs[0], VectorDiff::Append { values });
1800 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1801
1802 let focused_event_id = event_id!("$focused");
1804 let bob_user_id = bob.user_id().unwrap();
1805
1806 server
1807 .mock_room_event_context()
1808 .expect_any_access_token()
1809 .ok(RoomContextResponseTemplate::new(
1810 f.text_msg("focused msg")
1811 .sender(bob_user_id)
1812 .event_id(focused_event_id)
1813 .into_event(),
1814 )
1815 .start("back-token"))
1816 .mock_once()
1817 .mount()
1818 .await;
1819
1820 let event_focused_cache = room_cache
1821 .get_or_create_event_focused_cache(
1822 focused_event_id.to_owned(),
1823 20,
1824 EventFocusThreadMode::Automatic,
1825 )
1826 .await
1827 .unwrap();
1828
1829 server
1831 .mock_room_messages()
1832 .expect_any_access_token()
1833 .ok(RoomMessagesResponseTemplate::default().with_delay(Duration::from_secs(5)))
1834 .mock_once()
1835 .mount()
1836 .await;
1837
1838 let event_focused_cache_clone = event_focused_cache.clone();
1841 let pagination_task = tokio::spawn(async move {
1842 let _ = event_focused_cache_clone.paginate_backwards(20).await;
1843 });
1844
1845 sleep(Duration::from_millis(200)).await;
1847
1848 server
1856 .mock_sync()
1857 .ok_and_run(&bob, |builder| {
1858 builder.add_to_device_event(
1859 room_key
1860 .deserialize_as()
1861 .expect("We should be able to deserialize the room key"),
1862 );
1863 })
1864 .await;
1865
1866 sleep(Duration::from_secs(1)).await;
1868
1869 let (_events, _subscriber) = timeout(room_cache.subscribe(), Duration::from_millis(100))
1872 .await
1873 .expect("subscribing shouldn't timeout")
1874 .expect("subscribing should succeed");
1875
1876 pagination_task.abort();
1877 }
1878}