1use std::{
117 collections::{BTreeMap, BTreeSet},
118 pin::Pin,
119 sync::Weak,
120};
121
122use as_variant::as_variant;
123use futures_core::Stream;
124use futures_util::{StreamExt, future::join_all, pin_mut};
125#[cfg(doc)]
126use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
127use matrix_sdk_base::{
128 crypto::{
129 store::types::{RoomKeyInfo, RoomKeyWithheldInfo},
130 types::events::room::encrypted::EncryptedEvent,
131 },
132 deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind},
133 event_cache::store::EventCacheStoreLockState,
134 locks::Mutex,
135 task_monitor::BackgroundTaskHandle,
136 timer,
137};
138#[cfg(doc)]
139use matrix_sdk_common::deserialized_responses::EncryptionInfo;
140use ruma::{
141 OwnedEventId, OwnedRoomId, RoomId,
142 events::{AnySyncTimelineEvent, room::encrypted::OriginalSyncRoomEncryptedEvent},
143 push::Action,
144 serde::Raw,
145};
146use tokio::sync::{
147 broadcast::{self, Sender},
148 mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
149};
150use tokio_stream::wrappers::{
151 BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError,
152};
153use tracing::{info, instrument, trace, warn};
154
155#[cfg(doc)]
156use super::RoomEventCache;
157use super::{
158 EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheGenericUpdate,
159 RoomEventCacheUpdate, TimelineVectorDiffs,
160 caches::room::{PostProcessingOrigin, RoomEventCacheLinkedChunkUpdate},
161};
162use crate::{Client, Result, Room, encryption::backups::BackupState, room::PushContext};
163
164type SessionId<'a> = &'a str;
165type OwnedSessionId = String;
166
167type EventIdAndUtd = (OwnedEventId, Raw<AnySyncTimelineEvent>);
168type EventIdAndEvent = (OwnedEventId, DecryptedRoomEvent);
169pub(in crate::event_cache) type ResolvedUtd =
170 (OwnedEventId, DecryptedRoomEvent, Option<Vec<Action>>);
171
172#[derive(Debug, Clone)]
175pub struct DecryptionRetryRequest {
176 pub room_id: OwnedRoomId,
178 pub utd_session_ids: BTreeSet<OwnedSessionId>,
180 pub refresh_info_session_ids: BTreeSet<OwnedSessionId>,
183}
184
185#[derive(Debug, Clone)]
187pub enum RedecryptorReport {
188 ResolvedUtds {
190 room_id: OwnedRoomId,
192 events: BTreeSet<OwnedEventId>,
194 },
195 Lagging,
198 BackupAvailable,
203}
204
205pub(super) struct RedecryptorChannels {
206 utd_reporter: Sender<RedecryptorReport>,
207 pub(super) decryption_request_sender: UnboundedSender<DecryptionRetryRequest>,
208 pub(super) decryption_request_receiver:
209 Mutex<Option<UnboundedReceiver<DecryptionRetryRequest>>>,
210}
211
212impl RedecryptorChannels {
213 pub(super) fn new() -> Self {
214 let (utd_reporter, _) = broadcast::channel(100);
215 let (decryption_request_sender, decryption_request_receiver) = unbounded_channel();
216
217 Self {
218 utd_reporter,
219 decryption_request_sender,
220 decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)),
221 }
222 }
223}
224
225fn filter_timeline_event_to_utd(
230 event: TimelineEvent,
231) -> Option<(OwnedEventId, Raw<AnySyncTimelineEvent>)> {
232 let event_id = event.event_id();
233
234 let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event);
237 event_id.zip(event)
240}
241
242fn filter_timeline_event_to_decrypted(
248 event: TimelineEvent,
249) -> Option<(OwnedEventId, DecryptedRoomEvent)> {
250 let event_id = event.event_id();
251
252 let event = as_variant!(event.kind, TimelineEventKind::Decrypted(event) => event);
253 event_id.zip(event)
256}
257
258impl EventCache {
259 async fn get_utds(
267 &self,
268 room_id: &RoomId,
269 session_id: SessionId<'_>,
270 ) -> Result<Vec<EventIdAndUtd>, EventCacheError> {
271 let events = match self.inner.store.lock().await? {
272 EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
277 guard.get_room_events(room_id, Some("m.room.encrypted"), Some(session_id)).await?
278 }
279 };
280
281 Ok(events.into_iter().filter_map(filter_timeline_event_to_utd).collect())
282 }
283
284 async fn get_utds_from_memory(&self) -> BTreeMap<OwnedRoomId, Vec<EventIdAndUtd>> {
287 let mut utds = BTreeMap::new();
288
289 for (room_id, caches) in self.inner.by_room.read().await.iter() {
290 let room_utds: Vec<_> = caches
291 .all_events()
292 .await
293 .into_iter()
294 .flatten()
295 .filter_map(filter_timeline_event_to_utd)
296 .collect();
297
298 utds.insert(room_id.to_owned(), room_utds);
299 }
300
301 utds
302 }
303
304 async fn get_decrypted_events(
305 &self,
306 room_id: &RoomId,
307 session_id: SessionId<'_>,
308 ) -> Result<Vec<EventIdAndEvent>, EventCacheError> {
309 let events = match self.inner.store.lock().await? {
310 EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
315 guard.get_room_events(room_id, None, Some(session_id)).await?
316 }
317 };
318
319 Ok(events.into_iter().filter_map(filter_timeline_event_to_decrypted).collect())
320 }
321
322 async fn get_decrypted_events_from_memory(
323 &self,
324 ) -> BTreeMap<OwnedRoomId, Vec<EventIdAndEvent>> {
325 let mut decrypted_events = BTreeMap::new();
326
327 for (room_id, caches) in self.inner.by_room.read().await.iter() {
328 let room_utds: Vec<_> = caches
329 .all_events()
330 .await
331 .into_iter()
332 .flatten()
333 .filter_map(filter_timeline_event_to_decrypted)
334 .collect();
335
336 decrypted_events.insert(room_id.to_owned(), room_utds);
337 }
338
339 decrypted_events
340 }
341
342 #[instrument(skip_all, fields(room_id))]
354 async fn on_resolved_utds(
355 &self,
356 room_id: &RoomId,
357 events: Vec<ResolvedUtd>,
358 ) -> Result<(), EventCacheError> {
359 if events.is_empty() {
360 trace!("No events were redecrypted or updated, nothing to replace");
361 return Ok(());
362 }
363
364 timer!("Resolving UTDs");
365
366 let (room_cache, _drop_handles) = self.for_room(room_id).await?;
368
369 let event_ids: BTreeSet<_> =
370 events.iter().cloned().map(|(event_id, _, _)| event_id).collect();
371
372 let (pinned_cache, ef_caches) = {
379 let mut state = room_cache.state().write().await?;
380
381 let pinned_cache = state.pinned_event_cache().cloned();
382 let ef_caches: Vec<_> = state.event_focused_caches().cloned().collect();
383
384 let mut new_events = Vec::with_capacity(events.len());
386 for (event_id, decrypted, actions) in &events {
387 if let Some((location, mut target_event)) = state.find_event(event_id).await? {
388 target_event.kind = TimelineEventKind::Decrypted(decrypted.clone());
389
390 if let Some(actions) = actions {
391 target_event.set_push_actions(actions.clone());
392 }
393
394 state.replace_event_at(location, target_event.clone()).await?;
397 new_events.push(target_event);
398 }
399 }
400
401 let receipt_event = None;
409
410 state
411 .post_process_new_events(
412 new_events,
413 PostProcessingOrigin::Redecryption,
414 receipt_event,
415 )
416 .await?;
417
418 room_cache.update_sender().send(
419 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
420 diffs: state.room_linked_chunk_mut().updates_as_vector_diffs(),
421 origin: EventsOrigin::Cache,
422 }),
423 Some(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() }),
424 );
425
426 (pinned_cache, ef_caches)
427 };
428 if let Some(pinned_cache) = pinned_cache {
434 pinned_cache.replace_utds(&events).await?;
435 }
436
437 join_all(ef_caches.iter().map(|cache| cache.replace_utds(&events))).await;
442
443 let report =
444 RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids };
445 let _ = self.inner.redecryption_channels.utd_reporter.send(report);
446
447 Ok(())
448 }
449
450 async fn decrypt_event(
452 &self,
453 room_id: &RoomId,
454 room: Option<&Room>,
455 push_context: Option<&PushContext>,
456 event: &Raw<EncryptedEvent>,
457 ) -> Option<(DecryptedRoomEvent, Option<Vec<Action>>)> {
458 if let Some(room) = room {
459 match room
460 .decrypt_event(
461 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
462 push_context,
463 )
464 .await
465 {
466 Ok(maybe_decrypted) => {
467 let actions = maybe_decrypted.push_actions().map(|a| a.to_vec());
468
469 if let TimelineEventKind::Decrypted(decrypted) = maybe_decrypted.kind {
470 Some((decrypted, actions))
471 } else {
472 warn!(
473 "Failed to redecrypt an event despite receiving a room key or request to redecrypt"
474 );
475 None
476 }
477 }
478 Err(e) => {
479 warn!(
480 "Failed to redecrypt an event despite receiving a room key or request to redecrypt {e:?}"
481 );
482 None
483 }
484 }
485 } else {
486 let client = self.inner.client().ok()?;
487 let machine = client.olm_machine().await;
488 let machine = machine.as_ref()?;
489
490 match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await {
491 Ok(decrypted) => Some((decrypted, None)),
492 Err(e) => {
493 warn!(
494 "Failed to redecrypt an event despite receiving a room key or a request to redecrypt {e:?}"
495 );
496 None
497 }
498 }
499 }
500 }
501
502 #[instrument(skip_all, fields(room_id, session_id))]
505 async fn retry_decryption(
506 &self,
507 room_id: &RoomId,
508 session_id: SessionId<'_>,
509 ) -> Result<(), EventCacheError> {
510 let events = self.get_utds(room_id, session_id).await?;
512 self.retry_decryption_for_events(room_id, events).await
513 }
514
515 #[instrument(skip_all, fields(updates.linked_chunk_id))]
517 async fn retry_decryption_for_event_cache_updates(
518 &self,
519 updates: RoomEventCacheLinkedChunkUpdate,
520 ) -> Result<(), EventCacheError> {
521 let room_id = updates.linked_chunk_id.room_id();
522 let events: Vec<_> = updates
523 .updates
524 .into_iter()
525 .flat_map(|updates| updates.into_items())
526 .filter_map(filter_timeline_event_to_utd)
527 .collect();
528
529 self.retry_decryption_for_events(room_id, events).await
530 }
531
532 async fn retry_decryption_for_in_memory_events(&self) {
533 let utds = self.get_utds_from_memory().await;
534
535 for (room_id, utds) in utds.into_iter() {
536 if let Err(e) = self.retry_decryption_for_events(&room_id, utds).await {
537 warn!(%room_id, "Failed to redecrypt in-memory events {e:?}");
538 }
539 }
540 }
541
542 #[instrument(skip_all, fields(room_id, session_id))]
544 async fn retry_decryption_for_events(
545 &self,
546 room_id: &RoomId,
547 events: Vec<EventIdAndUtd>,
548 ) -> Result<(), EventCacheError> {
549 trace!("Retrying to decrypt");
550
551 if events.is_empty() {
552 trace!("No relevant events found.");
553 return Ok(());
554 }
555
556 let room = self.inner.client().ok().and_then(|client| client.get_room(room_id));
557 let push_context =
558 if let Some(room) = &room { room.push_context().await.ok().flatten() } else { None };
559
560 let mut decrypted_events = Vec::with_capacity(events.len());
562
563 for (event_id, event) in events {
564 if let Some((decrypted, actions)) = self
567 .decrypt_event(
568 room_id,
569 room.as_ref(),
570 push_context.as_ref(),
571 event.cast_ref_unchecked(),
572 )
573 .await
574 {
575 decrypted_events.push((event_id, decrypted, actions));
576 }
577 }
578
579 let event_ids: BTreeSet<_> =
580 decrypted_events.iter().map(|(event_id, _, _)| event_id).collect();
581
582 if !event_ids.is_empty() {
583 trace!(?event_ids, "Successfully redecrypted events");
584 }
585
586 self.on_resolved_utds(room_id, decrypted_events).await?;
589
590 Ok(())
591 }
592
593 async fn update_encryption_info_for_events(
595 &self,
596 room: &Room,
597 events: Vec<EventIdAndEvent>,
598 ) -> Result<(), EventCacheError> {
599 let mut updated_events = Vec::with_capacity(events.len());
601
602 for (event_id, mut event) in events {
603 if let Some(session_id) = event.encryption_info.session_id() {
604 let new_encryption_info =
605 room.get_encryption_info(session_id, &event.encryption_info.sender).await;
606
607 if let Some(new_encryption_info) = new_encryption_info
609 && event.encryption_info != new_encryption_info
610 {
611 event.encryption_info = new_encryption_info;
612 updated_events.push((event_id, event, None));
613 }
614 }
615 }
616
617 let event_ids: BTreeSet<_> =
618 updated_events.iter().map(|(event_id, _, _)| event_id).collect();
619
620 if !event_ids.is_empty() {
621 trace!(?event_ids, "Replacing the encryption info of some events");
622 }
623
624 self.on_resolved_utds(room.room_id(), updated_events).await
625 }
626
627 #[instrument(skip_all, fields(room_id, session_id))]
628 async fn update_encryption_info(
629 &self,
630 room_id: &RoomId,
631 session_id: SessionId<'_>,
632 ) -> Result<(), EventCacheError> {
633 trace!("Updating encryption info");
634
635 let Ok(client) = self.inner.client() else {
636 return Ok(());
637 };
638
639 let Some(room) = client.get_room(room_id) else {
640 return Ok(());
641 };
642
643 let events = self.get_decrypted_events(room_id, session_id).await?;
645
646 if events.is_empty() {
647 trace!("No relevant events found.");
648 return Ok(());
649 }
650
651 self.update_encryption_info_for_events(&room, events).await
653 }
654
655 async fn retry_update_encryption_info_for_in_memory_events(&self) {
656 let decrypted_events = self.get_decrypted_events_from_memory().await;
657
658 for (room_id, events) in decrypted_events.into_iter() {
659 let Some(room) = self.inner.client().ok().and_then(|c| c.get_room(&room_id)) else {
660 continue;
661 };
662
663 if let Err(e) = self.update_encryption_info_for_events(&room, events).await {
664 warn!(
665 %room_id,
666 "Failed to replace the encryption info for in-memory events {e:?}"
667 );
668 }
669 }
670 }
671
672 async fn retry_in_memory_events(&self) {
683 self.retry_decryption_for_in_memory_events().await;
684 self.retry_update_encryption_info_for_in_memory_events().await;
685 }
686
687 pub fn request_decryption(&self, request: DecryptionRetryRequest) {
728 let _ =
729 self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err(
730 |_| warn!("Requesting a decryption while the redecryption task has been shut down"),
731 );
732 }
733
734 pub fn subscribe_to_decryption_reports(
785 &self,
786 ) -> impl Stream<Item = Result<RedecryptorReport, BroadcastStreamRecvError>> {
787 BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe())
788 }
789}
790
791#[inline(always)]
792fn upgrade_event_cache(cache: &Weak<EventCacheInner>) -> Option<EventCache> {
793 cache.upgrade().map(|inner| EventCache { inner })
794}
795
796async fn send_report_and_retry_memory_events(
797 cache: &Weak<EventCacheInner>,
798 report: RedecryptorReport,
799) -> Result<(), ()> {
800 let Some(cache) = upgrade_event_cache(cache) else {
801 return Err(());
802 };
803
804 cache.retry_in_memory_events().await;
805 let _ = cache.inner.redecryption_channels.utd_reporter.send(report);
806
807 Ok(())
808}
809
810pub(crate) struct Redecryptor {
817 _task: BackgroundTaskHandle,
818}
819
820impl Redecryptor {
821 pub(super) fn new(
826 client: &Client,
827 cache: Weak<EventCacheInner>,
828 receiver: UnboundedReceiver<DecryptionRetryRequest>,
829 linked_chunk_update_sender: &Sender<RoomEventCacheLinkedChunkUpdate>,
830 ) -> Self {
831 let linked_chunk_stream = BroadcastStream::new(linked_chunk_update_sender.subscribe());
832 let backup_state_stream = client.encryption().backups().state_stream();
833
834 let task = client
835 .task_monitor()
836 .spawn_infinite_task("event_cache::redecryptor", async {
837 let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
838
839 Self::listen_for_room_keys_task(
840 cache,
841 request_redecryption_stream,
842 linked_chunk_stream,
843 backup_state_stream,
844 )
845 .await;
846 })
847 .abort_on_drop();
848
849 Self { _task: task }
850 }
851
852 async fn subscribe_to_room_key_stream(
857 cache: &Weak<EventCacheInner>,
858 ) -> Option<(
859 impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>,
860 impl Stream<Item = Vec<RoomKeyWithheldInfo>>,
861 )> {
862 let event_cache = cache.upgrade()?;
863 let client = event_cache.client().ok()?;
864 let machine = client.olm_machine().await;
865
866 machine.as_ref().map(|m| {
867 (m.store().room_keys_received_stream(), m.store().room_keys_withheld_received_stream())
868 })
869 }
870
871 async fn redecryption_loop(
872 cache: &Weak<EventCacheInner>,
873 decryption_request_stream: &mut Pin<&mut impl Stream<Item = DecryptionRetryRequest>>,
874 events_stream: &mut Pin<
875 &mut impl Stream<Item = Result<RoomEventCacheLinkedChunkUpdate, BroadcastStreamRecvError>>,
876 >,
877 backup_state_stream: &mut Pin<
878 &mut impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
879 >,
880 ) -> bool {
881 let Some((room_key_stream, withheld_stream)) =
882 Self::subscribe_to_room_key_stream(cache).await
883 else {
884 return false;
885 };
886
887 pin_mut!(room_key_stream);
888 pin_mut!(withheld_stream);
889
890 loop {
891 tokio::select! {
892 Some(request) = decryption_request_stream.next() => {
895 let Some(cache) = upgrade_event_cache(cache) else {
896 break false;
897 };
898
899 trace!(?request, "Received a redecryption request");
900
901 for session_id in request.utd_session_ids {
902 let _ = cache
903 .retry_decryption(&request.room_id, &session_id)
904 .await
905 .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}"));
906 }
907
908 for session_id in request.refresh_info_session_ids {
909 let _ = cache.update_encryption_info(&request.room_id, &session_id).await.inspect_err(|e|
910 warn!(
911 room_id = %request.room_id,
912 session_id = session_id,
913 "Unable to update the encryption info {e:?}",
914 ));
915 }
916 }
917 room_keys = room_key_stream.next() => {
920 match room_keys {
921 Some(Ok(room_keys)) => {
922 let Some(cache) = upgrade_event_cache(cache) else {
926 break false;
927 };
928
929 trace!(?room_keys, "Received new room keys");
930
931 for key in &room_keys {
932 let _ = cache
933 .retry_decryption(&key.room_id, &key.session_id)
934 .await
935 .inspect_err(|e| warn!("Error redecrypting {e:?}"));
936 }
937
938 for key in room_keys {
939 let _ = cache.update_encryption_info(&key.room_id, &key.session_id).await.inspect_err(|e|
940 warn!(
941 room_id = %key.room_id,
942 session_id = key.session_id,
943 "Unable to update the encryption info {e:?}",
944 ));
945 }
946 },
947 Some(Err(_)) => {
948 warn!("The room key stream lagged, reporting the lag to our listeners");
955
956 if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
957 break false;
958 }
959 },
960 None => {
963 break true;
964 }
965 }
966 }
967 withheld_info = withheld_stream.next() => {
968 match withheld_info {
969 Some(infos) => {
970 let Some(cache) = upgrade_event_cache(cache) else {
971 break false;
972 };
973
974 trace!(?infos, "Received new withheld infos");
975
976 for RoomKeyWithheldInfo { room_id, session_id, .. } in &infos {
977 let _ = cache.update_encryption_info(room_id, session_id).await.inspect_err(|e|
978 warn!(
979 room_id = %room_id,
980 session_id = session_id,
981 "Unable to update the encryption info {e:?}",
982 ));
983 }
984 }
985 None => break true,
988 }
989 }
990 Some(event_updates) = events_stream.next() => {
994 match event_updates {
995 Ok(updates) => {
996 let Some(cache) = upgrade_event_cache(cache) else {
997 break false;
998 };
999
1000 let linked_chunk_id = updates.linked_chunk_id.to_owned();
1001
1002 let _ = cache.retry_decryption_for_event_cache_updates(updates).await.inspect_err(|e|
1003 warn!(
1004 %linked_chunk_id,
1005 "Unable to handle UTDs from event cache updates {e:?}",
1006 )
1007 );
1008 }
1009 Err(_) => {
1010 if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1011 break false;
1012 }
1013 }
1014 }
1015 }
1016 Some(backup_state_update) = backup_state_stream.next() => {
1017 match backup_state_update {
1018 Ok(state) => {
1019 match state {
1020 BackupState::Unknown |
1021 BackupState::Creating |
1022 BackupState::Enabling |
1023 BackupState::Resuming |
1024 BackupState::Downloading |
1025 BackupState::Disabling =>{
1026 }
1029 BackupState::Enabled => {
1030 if send_report_and_retry_memory_events(cache, RedecryptorReport::BackupAvailable).await.is_err() {
1035 break false;
1036 }
1037 }
1038 }
1039 }
1040 Err(_) => {
1041 if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1042 break false;
1043 }
1044 }
1045 }
1046 }
1047 else => break false,
1048 }
1049 }
1050 }
1051
1052 async fn listen_for_room_keys_task(
1053 cache: Weak<EventCacheInner>,
1054 decryption_request_stream: UnboundedReceiverStream<DecryptionRetryRequest>,
1055 events_stream: BroadcastStream<RoomEventCacheLinkedChunkUpdate>,
1056 backup_state_stream: impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
1057 ) {
1058 pin_mut!(decryption_request_stream);
1062 pin_mut!(events_stream);
1063 pin_mut!(backup_state_stream);
1064
1065 while Self::redecryption_loop(
1066 &cache,
1067 &mut decryption_request_stream,
1068 &mut events_stream,
1069 &mut backup_state_stream,
1070 )
1071 .await
1072 {
1073 info!("Regenerating the re-decryption streams");
1074
1075 if send_report_and_retry_memory_events(&cache, RedecryptorReport::Lagging)
1078 .await
1079 .is_err()
1080 {
1081 break;
1082 }
1083 }
1084
1085 info!("Shutting down the event cache redecryptor");
1086 }
1087}
1088
1089#[cfg(not(target_family = "wasm"))]
1090#[cfg(test)]
1091mod tests {
1092 use std::{
1093 collections::BTreeSet,
1094 sync::{
1095 Arc,
1096 atomic::{AtomicBool, Ordering},
1097 },
1098 time::Duration,
1099 };
1100
1101 use assert_matches2::assert_matches;
1102 use async_trait::async_trait;
1103 use eyeball_im::VectorDiff;
1104 use matrix_sdk_base::{
1105 cross_process_lock::CrossProcessLockGeneration,
1106 crypto::types::events::{ToDeviceEvent, room::encrypted::ToDeviceEncryptedEventContent},
1107 deserialized_responses::{TimelineEventKind, VerificationState},
1108 event_cache::{
1109 Event, Gap,
1110 store::{EventCacheStore, EventCacheStoreError, MemoryStore},
1111 },
1112 linked_chunk::{
1113 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
1114 RawChunk, Update,
1115 },
1116 locks::Mutex,
1117 sleep::sleep,
1118 store::StoreConfig,
1119 timeout::timeout,
1120 };
1121 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
1122 use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
1123 use ruma::{
1124 EventId, OwnedEventId, RoomId, RoomVersionId, device_id, event_id,
1125 events::{AnySyncTimelineEvent, relation::RelationType},
1126 room_id,
1127 serde::Raw,
1128 user_id,
1129 };
1130 use serde_json::json;
1131 use tokio::sync::oneshot::{self, Sender};
1132 use tracing::{Instrument, info};
1133
1134 use crate::{
1135 Client, assert_let_timeout,
1136 encryption::EncryptionSettings,
1137 event_cache::{
1138 DecryptionRetryRequest, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
1139 TimelineVectorDiffs,
1140 },
1141 test_utils::mocks::MatrixMockServer,
1142 };
1143
1144 #[derive(Debug, Clone)]
1149 struct DelayingStore {
1150 memory_store: MemoryStore,
1151 delaying: Arc<AtomicBool>,
1152 foo: Arc<Mutex<Option<Sender<()>>>>,
1153 }
1154
1155 impl DelayingStore {
1156 fn new() -> Self {
1157 Self {
1158 memory_store: MemoryStore::new(),
1159 delaying: AtomicBool::new(true).into(),
1160 foo: Arc::new(Mutex::new(None)),
1161 }
1162 }
1163
1164 async fn stop_delaying(&self) {
1165 let (sender, receiver) = oneshot::channel();
1166
1167 {
1168 *self.foo.lock() = Some(sender);
1169 }
1170
1171 self.delaying.store(false, Ordering::SeqCst);
1172
1173 receiver.await.expect("We should be able to receive a response")
1174 }
1175 }
1176
1177 #[cfg_attr(target_family = "wasm", async_trait(?Send))]
1178 #[cfg_attr(not(target_family = "wasm"), async_trait)]
1179 impl EventCacheStore for DelayingStore {
1180 type Error = EventCacheStoreError;
1181
1182 async fn close(&self) -> Result<(), EventCacheStoreError> {
1183 self.memory_store.close().await
1184 }
1185
1186 async fn reopen(&self) -> Result<(), EventCacheStoreError> {
1187 self.memory_store.reopen().await
1188 }
1189
1190 async fn try_take_leased_lock(
1191 &self,
1192 lease_duration_ms: u32,
1193 key: &str,
1194 holder: &str,
1195 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
1196 self.memory_store.try_take_leased_lock(lease_duration_ms, key, holder).await
1197 }
1198
1199 async fn handle_linked_chunk_updates(
1200 &self,
1201 linked_chunk_id: LinkedChunkId<'_>,
1202 updates: Vec<Update<Event, Gap>>,
1203 ) -> Result<(), Self::Error> {
1204 while self.delaying.load(Ordering::SeqCst) {
1210 sleep(Duration::from_millis(10)).await;
1211 }
1212
1213 let sender = self.foo.lock().take();
1214 let ret = self.memory_store.handle_linked_chunk_updates(linked_chunk_id, updates).await;
1215
1216 if let Some(sender) = sender {
1217 sender.send(()).expect("We should be able to notify the other side that we're done with the storage operation");
1218 }
1219
1220 ret
1221 }
1222
1223 async fn load_all_chunks(
1224 &self,
1225 linked_chunk_id: LinkedChunkId<'_>,
1226 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
1227 self.memory_store.load_all_chunks(linked_chunk_id).await
1228 }
1229
1230 async fn load_all_chunks_metadata(
1231 &self,
1232 linked_chunk_id: LinkedChunkId<'_>,
1233 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
1234 self.memory_store.load_all_chunks_metadata(linked_chunk_id).await
1235 }
1236
1237 async fn load_last_chunk(
1238 &self,
1239 linked_chunk_id: LinkedChunkId<'_>,
1240 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1241 self.memory_store.load_last_chunk(linked_chunk_id).await
1242 }
1243
1244 async fn load_previous_chunk(
1245 &self,
1246 linked_chunk_id: LinkedChunkId<'_>,
1247 before_chunk_identifier: ChunkIdentifier,
1248 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1249 self.memory_store.load_previous_chunk(linked_chunk_id, before_chunk_identifier).await
1250 }
1251
1252 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1253 self.memory_store.clear_all_linked_chunks().await
1254 }
1255
1256 async fn filter_duplicated_events(
1257 &self,
1258 linked_chunk_id: LinkedChunkId<'_>,
1259 events: Vec<OwnedEventId>,
1260 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1261 self.memory_store.filter_duplicated_events(linked_chunk_id, events).await
1262 }
1263
1264 async fn find_event(
1265 &self,
1266 room_id: &RoomId,
1267 event_id: &EventId,
1268 ) -> Result<Option<Event>, Self::Error> {
1269 self.memory_store.find_event(room_id, event_id).await
1270 }
1271
1272 async fn find_event_relations(
1273 &self,
1274 room_id: &RoomId,
1275 event_id: &EventId,
1276 filters: Option<&[RelationType]>,
1277 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1278 self.memory_store.find_event_relations(room_id, event_id, filters).await
1279 }
1280
1281 async fn get_room_events(
1282 &self,
1283 room_id: &RoomId,
1284 event_type: Option<&str>,
1285 session_id: Option<&str>,
1286 ) -> Result<Vec<Event>, Self::Error> {
1287 self.memory_store.get_room_events(room_id, event_type, session_id).await
1288 }
1289
1290 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1291 self.memory_store.save_event(room_id, event).await
1292 }
1293
1294 async fn optimize(&self) -> Result<(), Self::Error> {
1295 self.memory_store.optimize().await
1296 }
1297
1298 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1299 self.memory_store.get_size().await
1300 }
1301 }
1302
1303 async fn set_up_clients(
1304 room_id: &RoomId,
1305 alice_enables_cross_signing: bool,
1306 use_delayed_store: bool,
1307 ) -> (Client, Client, MatrixMockServer, Option<DelayingStore>) {
1308 let alice_span = tracing::info_span!("alice");
1309 let bob_span = tracing::info_span!("bob");
1310
1311 let alice_user_id = user_id!("@alice:localhost");
1312 let alice_device_id = device_id!("ALICEDEVICE");
1313 let bob_user_id = user_id!("@bob:localhost");
1314 let bob_device_id = device_id!("BOBDEVICE");
1315
1316 let matrix_mock_server = MatrixMockServer::new().await;
1317 matrix_mock_server.mock_crypto_endpoints_preset().await;
1318
1319 let encryption_settings = EncryptionSettings {
1320 auto_enable_cross_signing: alice_enables_cross_signing,
1321 ..Default::default()
1322 };
1323
1324 let alice = matrix_mock_server
1327 .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id)
1328 .on_builder(|builder| {
1329 builder
1330 .with_enable_share_history_on_invite(true)
1331 .with_encryption_settings(encryption_settings)
1332 })
1333 .build()
1334 .instrument(alice_span.clone())
1335 .await;
1336
1337 let encryption_settings =
1338 EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() };
1339
1340 let (store_config, store) = if use_delayed_store {
1341 let store = DelayingStore::new();
1342
1343 (
1344 StoreConfig::new(CrossProcessLockConfig::multi_process(
1345 "delayed_store_event_cache_test",
1346 ))
1347 .event_cache_store(store.clone()),
1348 Some(store),
1349 )
1350 } else {
1351 (
1352 StoreConfig::new(CrossProcessLockConfig::multi_process(
1353 "normal_store_event_cache_test",
1354 )),
1355 None,
1356 )
1357 };
1358
1359 let bob = matrix_mock_server
1360 .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id)
1361 .on_builder(|builder| {
1362 builder
1363 .with_enable_share_history_on_invite(true)
1364 .with_encryption_settings(encryption_settings)
1365 .store_config(store_config)
1366 })
1367 .build()
1368 .instrument(bob_span.clone())
1369 .await;
1370
1371 bob.event_cache().subscribe().expect("Bob should be able to enable the event cache");
1372
1373 matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await;
1375
1376 let event_factory = EventFactory::new().room(room_id).sender(alice_user_id);
1377
1378 let room_builder = JoinedRoomBuilder::new(room_id)
1380 .add_state_event(event_factory.create(alice_user_id, RoomVersionId::V1))
1381 .add_state_event(event_factory.room_encryption());
1382
1383 matrix_mock_server
1384 .mock_sync()
1385 .ok_and_run(&alice, |builder| {
1386 builder.add_joined_room(room_builder.clone());
1387 })
1388 .instrument(alice_span)
1389 .await;
1390
1391 matrix_mock_server
1392 .mock_sync()
1393 .ok_and_run(&bob, |builder| {
1394 builder.add_joined_room(room_builder);
1395 })
1396 .instrument(bob_span)
1397 .await;
1398
1399 (alice, bob, matrix_mock_server, store)
1400 }
1401
1402 async fn prepare_room(
1403 matrix_mock_server: &MatrixMockServer,
1404 event_factory: &EventFactory,
1405 alice: &Client,
1406 bob: &Client,
1407 room_id: &RoomId,
1408 ) -> (Raw<AnySyncTimelineEvent>, Raw<ToDeviceEvent<ToDeviceEncryptedEventContent>>) {
1409 let alice_user_id = alice.user_id().unwrap();
1410 let bob_user_id = bob.user_id().unwrap();
1411
1412 let alice_member_event = event_factory.member(alice_user_id).into_raw();
1413 let bob_member_event = event_factory.member(bob_user_id).into_raw();
1414
1415 let room = alice
1416 .get_room(room_id)
1417 .expect("Alice should have access to the room now that we synced");
1418
1419 let event_type = "m.room.message";
1424 let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"});
1425
1426 let event_id = event_id!("$some_id");
1427 let (event_receiver, mock) =
1428 matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id);
1429 let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await;
1430
1431 {
1432 let _guard = mock.mock_once().mount_as_scoped().await;
1433
1434 matrix_mock_server
1435 .mock_get_members()
1436 .ok(vec![alice_member_event.clone(), bob_member_event.clone()])
1437 .mock_once()
1438 .mount()
1439 .await;
1440
1441 room.send_raw(event_type, content)
1442 .await
1443 .expect("We should be able to send an initial message");
1444 };
1445
1446 let event = event_receiver.await.expect("Alice should have sent the event by now");
1448 let room_key = room_key.await;
1449
1450 (event, room_key)
1451 }
1452
1453 #[async_test]
1454 async fn test_redecryptor() {
1455 let room_id = room_id!("!test:localhost");
1456
1457 let event_factory = EventFactory::new().room(room_id);
1458 let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, true, false).await;
1459
1460 let (event, room_key) =
1461 prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1462
1463 let event_cache = bob.event_cache();
1466 let (room_cache, _) = event_cache
1467 .for_room(room_id)
1468 .await
1469 .expect("We should be able to get to the event cache for a specific room");
1470
1471 let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1472 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1473
1474 bob.inner
1477 .base_client
1478 .regenerate_olm(None)
1479 .await
1480 .expect("We should be able to regenerate the Olm machine");
1481
1482 matrix_mock_server
1484 .mock_sync()
1485 .ok_and_run(&bob, |builder| {
1486 builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1487 })
1488 .await;
1489
1490 assert_let_timeout!(
1493 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1494 subscriber.recv()
1495 );
1496
1497 assert_eq!(diffs.len(), 1);
1500 assert_matches!(&diffs[0], VectorDiff::Append { values });
1501 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1502
1503 assert_let_timeout!(
1504 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1505 );
1506 assert_eq!(expected_room_id, room_id);
1507 assert!(generic_stream.is_empty());
1508
1509 matrix_mock_server
1511 .mock_sync()
1512 .ok_and_run(&bob, |builder| {
1513 builder.add_to_device_event(
1514 room_key
1515 .deserialize_as()
1516 .expect("We should be able to deserialize the room key"),
1517 );
1518 })
1519 .await;
1520
1521 assert_let_timeout!(
1523 Duration::from_secs(1),
1524 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1525 subscriber.recv()
1526 );
1527
1528 assert_eq!(diffs.len(), 1);
1530 assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1531 assert_eq!(*index, 0);
1532 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1533
1534 assert_let_timeout!(
1535 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1536 );
1537 assert_eq!(expected_room_id, room_id);
1538 assert!(generic_stream.is_empty());
1539 }
1540
1541 #[async_test]
1542 async fn test_redecryptor_updating_encryption_info() {
1543 let bob_span = tracing::info_span!("bob");
1544
1545 let room_id = room_id!("!test:localhost");
1546
1547 let event_factory = EventFactory::new().room(room_id);
1548 let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, false, false).await;
1549
1550 let (event, room_key) =
1551 prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1552
1553 let event_cache = bob.event_cache();
1556 let (room_cache, _) = event_cache
1557 .for_room(room_id)
1558 .instrument(bob_span.clone())
1559 .await
1560 .expect("We should be able to get to the event cache for a specific room");
1561
1562 let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1563 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1564
1565 matrix_mock_server
1567 .mock_sync()
1568 .ok_and_run(&bob, |builder| {
1569 builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1570 })
1571 .instrument(bob_span.clone())
1572 .await;
1573
1574 assert_let_timeout!(
1577 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1578 subscriber.recv()
1579 );
1580
1581 assert_eq!(diffs.len(), 1);
1584 assert_matches!(&diffs[0], VectorDiff::Append { values });
1585 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1586
1587 assert_let_timeout!(
1588 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1589 );
1590 assert_eq!(expected_room_id, room_id);
1591 assert!(generic_stream.is_empty());
1592
1593 matrix_mock_server
1595 .mock_sync()
1596 .ok_and_run(&bob, |builder| {
1597 builder.add_to_device_event(
1598 room_key
1599 .deserialize_as()
1600 .expect("We should be able to deserialize the room key"),
1601 );
1602 })
1603 .instrument(bob_span.clone())
1604 .await;
1605
1606 assert_let_timeout!(
1608 Duration::from_secs(1),
1609 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1610 subscriber.recv()
1611 );
1612
1613 assert_eq!(diffs.len(), 1);
1615 assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1616 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1617
1618 let encryption_info = value.encryption_info().unwrap();
1619 assert_matches!(&encryption_info.verification_state, VerificationState::Unverified(_));
1620
1621 assert_let_timeout!(
1622 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1623 );
1624 assert_eq!(expected_room_id, room_id);
1625 assert!(generic_stream.is_empty());
1626
1627 let session_id = encryption_info.session_id().unwrap().to_owned();
1628 let alice_user_id = alice.user_id().unwrap();
1629
1630 alice
1632 .encryption()
1633 .bootstrap_cross_signing(None)
1634 .await
1635 .expect("Alice should be able to create the cross-signing keys");
1636
1637 bob.update_tracked_users_for_testing([alice_user_id]).instrument(bob_span.clone()).await;
1638 matrix_mock_server
1639 .mock_sync()
1640 .ok_and_run(&bob, |builder| {
1641 builder.add_change_device(alice_user_id);
1642 })
1643 .instrument(bob_span.clone())
1644 .await;
1645
1646 bob.event_cache().request_decryption(DecryptionRetryRequest {
1647 room_id: room_id.into(),
1648 utd_session_ids: BTreeSet::new(),
1649 refresh_info_session_ids: BTreeSet::from([session_id]),
1650 });
1651
1652 assert_let_timeout!(
1655 Duration::from_secs(1),
1656 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1657 subscriber.recv()
1658 );
1659
1660 assert_eq!(diffs.len(), 1);
1661 assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1662 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1663 let encryption_info = value.encryption_info().unwrap();
1664
1665 assert_matches!(
1666 &encryption_info.verification_state,
1667 VerificationState::Unverified(_),
1668 "The event should now know about the identity but still be unverified"
1669 );
1670
1671 assert_let_timeout!(
1672 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1673 );
1674 assert_eq!(expected_room_id, room_id);
1675 assert!(generic_stream.is_empty());
1676 }
1677
1678 #[async_test]
1679 async fn test_event_is_redecrypted_even_if_key_arrives_while_event_processing() {
1680 let room_id = room_id!("!test:localhost");
1681
1682 let event_factory = EventFactory::new().room(room_id);
1683 let (alice, bob, matrix_mock_server, delayed_store) =
1684 set_up_clients(room_id, true, true).await;
1685
1686 let delayed_store = delayed_store.unwrap();
1687
1688 let (event, room_key) =
1689 prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1690
1691 let event_cache = bob.event_cache();
1692
1693 let (room_cache, _) = event_cache
1695 .for_room(room_id)
1696 .await
1697 .expect("We should be able to get to the event cache for a specific room");
1698
1699 let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1700 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1701
1702 matrix_mock_server
1704 .mock_sync()
1705 .ok_and_run(&bob, |builder| {
1706 builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1707 })
1708 .await;
1709
1710 matrix_mock_server
1712 .mock_sync()
1713 .ok_and_run(&bob, |builder| {
1714 builder.add_to_device_event(
1715 room_key
1716 .deserialize_as()
1717 .expect("We should be able to deserialize the room key"),
1718 );
1719 })
1720 .await;
1721
1722 info!("Stopping the delay");
1723 delayed_store.stop_delaying().await;
1724
1725 assert_let_timeout!(
1732 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1733 subscriber.recv()
1734 );
1735
1736 assert_eq!(diffs.len(), 1);
1739 assert_matches!(&diffs[0], VectorDiff::Append { values });
1740 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1741
1742 assert_let_timeout!(
1743 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1744 );
1745 assert_eq!(expected_room_id, room_id);
1746
1747 assert_let_timeout!(
1749 Duration::from_secs(1),
1750 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1751 subscriber.recv()
1752 );
1753
1754 assert_eq!(diffs.len(), 1);
1756 assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1757 assert_eq!(*index, 0);
1758 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1759
1760 assert_let_timeout!(
1761 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1762 );
1763 assert_eq!(expected_room_id, room_id);
1764 assert!(generic_stream.is_empty());
1765 }
1766
1767 #[async_test]
1771 async fn test_redecryptor_no_deadlock_with_event_focused_cache_pagination() {
1772 use crate::{
1773 event_cache::EventFocusThreadMode,
1774 test_utils::mocks::{RoomContextResponseTemplate, RoomMessagesResponseTemplate},
1775 };
1776
1777 let room_id = room_id!("!test:localhost");
1778 let f = EventFactory::new().room(room_id);
1779 let (alice, bob, server, _) = set_up_clients(room_id, true, false).await;
1780
1781 let (encrypted_event, room_key) = prepare_room(&server, &f, &alice, &bob, room_id).await;
1782
1783 let event_cache = bob.event_cache();
1784 let (room_cache, _drop_handles) = event_cache
1785 .for_room(room_id)
1786 .await
1787 .expect("Bob should have an event cache for the room");
1788
1789 let (_initial_events, mut subscriber) = room_cache.subscribe().await.unwrap();
1790
1791 server
1793 .mock_sync()
1794 .ok_and_run(&bob, |builder| {
1795 builder.add_joined_room(
1796 JoinedRoomBuilder::new(room_id).add_timeline_event(encrypted_event),
1797 );
1798 })
1799 .await;
1800
1801 assert_let_timeout!(
1803 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1804 subscriber.recv()
1805 );
1806 assert_eq!(diffs.len(), 1);
1807 assert_matches!(&diffs[0], VectorDiff::Append { values });
1808 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1809
1810 let focused_event_id = event_id!("$focused");
1812 let bob_user_id = bob.user_id().unwrap();
1813
1814 server
1815 .mock_room_event_context()
1816 .expect_any_access_token()
1817 .ok(RoomContextResponseTemplate::new(
1818 f.text_msg("focused msg")
1819 .sender(bob_user_id)
1820 .event_id(focused_event_id)
1821 .into_event(),
1822 )
1823 .start("back-token"))
1824 .mock_once()
1825 .mount()
1826 .await;
1827
1828 let event_focused_cache = room_cache
1829 .get_or_create_event_focused_cache(
1830 focused_event_id.to_owned(),
1831 20,
1832 EventFocusThreadMode::Automatic,
1833 )
1834 .await
1835 .unwrap();
1836
1837 server
1839 .mock_room_messages()
1840 .expect_any_access_token()
1841 .ok(RoomMessagesResponseTemplate::default().with_delay(Duration::from_secs(5)))
1842 .mock_once()
1843 .mount()
1844 .await;
1845
1846 let event_focused_cache_clone = event_focused_cache.clone();
1849 let pagination_task = tokio::spawn(async move {
1850 let _ = event_focused_cache_clone.paginate_backwards(20).await;
1851 });
1852
1853 sleep(Duration::from_millis(200)).await;
1855
1856 server
1864 .mock_sync()
1865 .ok_and_run(&bob, |builder| {
1866 builder.add_to_device_event(
1867 room_key
1868 .deserialize_as()
1869 .expect("We should be able to deserialize the room key"),
1870 );
1871 })
1872 .await;
1873
1874 sleep(Duration::from_secs(1)).await;
1876
1877 let (_events, _subscriber) = timeout(room_cache.subscribe(), Duration::from_millis(100))
1880 .await
1881 .expect("subscribing shouldn't timeout")
1882 .expect("subscribing should succeed");
1883
1884 pagination_task.abort();
1885 }
1886}