1use std::{
117 collections::{BTreeMap, BTreeSet},
118 pin::Pin,
119 sync::Weak,
120};
121
122use as_variant::as_variant;
123use futures_core::Stream;
124use futures_util::{StreamExt, future::try_join_all, pin_mut};
125#[cfg(doc)]
126use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
127use matrix_sdk_base::{
128 crypto::{
129 store::types::{RoomKeyInfo, RoomKeyWithheldInfo},
130 types::events::room::encrypted::EncryptedEvent,
131 },
132 deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind},
133 locks::Mutex,
134 task_monitor::BackgroundTaskHandle,
135 timer,
136};
137#[cfg(doc)]
138use matrix_sdk_common::deserialized_responses::EncryptionInfo;
139use ruma::{
140 OwnedEventId, OwnedRoomId, RoomId,
141 events::{AnySyncTimelineEvent, room::encrypted::OriginalSyncRoomEncryptedEvent},
142 push::Action,
143 serde::Raw,
144};
145use tokio::sync::{
146 broadcast::{self, Sender},
147 mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
148};
149use tokio_stream::wrappers::{
150 BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError,
151};
152use tracing::{info, instrument, trace, warn};
153
154#[cfg(doc)]
155use super::RoomEventCache;
156use super::{
157 EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheGenericUpdate,
158 RoomEventCacheUpdate, TimelineVectorDiffs, caches::room::RoomEventCacheLinkedChunkUpdate,
159};
160use crate::{Client, Result, Room, encryption::backups::BackupState, room::PushContext};
161
162type SessionId<'a> = &'a str;
163type OwnedSessionId = String;
164
165type EventIdAndUtd = (OwnedEventId, Raw<AnySyncTimelineEvent>);
166type EventIdAndEvent = (OwnedEventId, DecryptedRoomEvent);
167pub(in crate::event_cache) type ResolvedUtd =
168 (OwnedEventId, DecryptedRoomEvent, Option<Vec<Action>>);
169
170#[derive(Debug, Clone)]
173pub struct DecryptionRetryRequest {
174 pub room_id: OwnedRoomId,
176 pub utd_session_ids: BTreeSet<OwnedSessionId>,
178 pub refresh_info_session_ids: BTreeSet<OwnedSessionId>,
181}
182
183#[derive(Debug, Clone)]
185pub enum RedecryptorReport {
186 ResolvedUtds {
188 room_id: OwnedRoomId,
190 events: BTreeSet<OwnedEventId>,
192 },
193 Lagging,
196 BackupAvailable,
201}
202
203pub(super) struct RedecryptorChannels {
204 utd_reporter: Sender<RedecryptorReport>,
205 pub(super) decryption_request_sender: UnboundedSender<DecryptionRetryRequest>,
206 pub(super) decryption_request_receiver:
207 Mutex<Option<UnboundedReceiver<DecryptionRetryRequest>>>,
208}
209
210impl RedecryptorChannels {
211 pub(super) fn new() -> Self {
212 let (utd_reporter, _) = broadcast::channel(100);
213 let (decryption_request_sender, decryption_request_receiver) = unbounded_channel();
214
215 Self {
216 utd_reporter,
217 decryption_request_sender,
218 decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)),
219 }
220 }
221}
222
223fn filter_timeline_event_to_utd(
228 event: TimelineEvent,
229) -> Option<(OwnedEventId, Raw<AnySyncTimelineEvent>)> {
230 let event_id = event.event_id().map(ToOwned::to_owned);
231
232 let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event);
235 event_id.zip(event)
238}
239
240fn filter_timeline_event_to_decrypted(
246 event: TimelineEvent,
247) -> Option<(OwnedEventId, DecryptedRoomEvent)> {
248 let event_id = event.event_id().map(ToOwned::to_owned);
249
250 let event = as_variant!(event.kind, TimelineEventKind::Decrypted(event) => event);
251 event_id.zip(event)
254}
255
256impl EventCache {
257 async fn all_encrypted_events(
265 &self,
266 room_id: &RoomId,
267 session_id: SessionId<'_>,
268 ) -> Result<Vec<EventIdAndUtd>, EventCacheError> {
269 let caches = self.inner.all_caches_for_room(room_id).await?;
270
271 Ok(caches
272 .all_events_of_type(Some("m.room.encrypted"), Some(session_id))
273 .await?
274 .filter_map(filter_timeline_event_to_utd)
275 .collect())
276 }
277
278 async fn all_in_memory_encrypted_events(&self) -> BTreeMap<OwnedRoomId, Vec<EventIdAndUtd>> {
281 let mut utds = BTreeMap::new();
282
283 for (room_id, caches) in self.inner.by_room.read().await.iter() {
284 let room_utds: Vec<_> = caches
285 .all_in_memory_events()
286 .await
287 .into_iter()
288 .flatten()
289 .filter_map(filter_timeline_event_to_utd)
290 .collect();
291
292 utds.insert(room_id.to_owned(), room_utds);
293 }
294
295 utds
296 }
297
298 async fn all_decrypted_events(
299 &self,
300 room_id: &RoomId,
301 session_id: SessionId<'_>,
302 ) -> Result<Vec<EventIdAndEvent>, EventCacheError> {
303 let caches = self.inner.all_caches_for_room(room_id).await?;
304
305 Ok(caches
306 .all_events_of_type(None, Some(session_id))
307 .await?
308 .filter_map(filter_timeline_event_to_decrypted)
309 .collect())
310 }
311
312 async fn all_in_memory_decrypted_events(&self) -> BTreeMap<OwnedRoomId, Vec<EventIdAndEvent>> {
313 let mut decrypted_events = BTreeMap::new();
314
315 for (room_id, caches) in self.inner.by_room.read().await.iter() {
316 let room_utds: Vec<_> = caches
317 .all_in_memory_events()
318 .await
319 .into_iter()
320 .flatten()
321 .filter_map(filter_timeline_event_to_decrypted)
322 .collect();
323
324 decrypted_events.insert(room_id.to_owned(), room_utds);
325 }
326
327 decrypted_events
328 }
329
330 #[instrument(skip_all, fields(room_id))]
342 async fn on_resolved_utds(
343 &self,
344 room_id: &RoomId,
345 events: Vec<ResolvedUtd>,
346 ) -> Result<(), EventCacheError> {
347 if events.is_empty() {
348 trace!("No events were redecrypted or updated, nothing to replace");
349 return Ok(());
350 }
351
352 timer!("Resolving UTDs");
353
354 let event_ids: BTreeSet<_> =
355 events.iter().cloned().map(|(event_id, _, _)| event_id).collect();
356
357 let all_caches = self.inner.all_caches_for_room(room_id).await?;
358
359 {
361 let room_cache = &all_caches.room;
362 let mut state = room_cache.state().write().await?;
363
364 let mut new_events = Vec::with_capacity(events.len());
365
366 for (event_id, decrypted, actions) in &events {
367 if let Some((location, mut target_event)) = state.find_event(event_id).await?
368 && (
369 matches!(target_event.kind, TimelineEventKind::UnableToDecrypt { .. })
380 || target_event.encryption_info() != Some(&decrypted.encryption_info)
381 )
382 {
383 target_event.kind = TimelineEventKind::Decrypted(decrypted.clone());
384
385 if let Some(actions) = actions {
386 target_event.set_push_actions(actions.clone());
387 }
388
389 state.replace_event_at(location, target_event.clone()).await?;
392 new_events.push(target_event);
393 }
394 }
395
396 let receipt_event = None;
404
405 state.post_process_new_events(new_events, receipt_event).await?;
406
407 let updates_as_vector_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
408
409 if !updates_as_vector_diffs.is_empty() {
410 room_cache.update_sender().send(
411 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
412 diffs: updates_as_vector_diffs,
413 origin: EventsOrigin::Cache,
414 }),
415 Some(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() }),
416 );
417 }
418 }
419
420 {
422 for (thread_id, thread_cache) in try_join_all(
430 all_caches.threads.read().await.iter().map(|(thread_id, thread_cache)| async {
431 Result::<_, EventCacheError>::Ok(
432 thread_cache
435 .replace_utds(&events)
436 .await?
437 .then(|| (thread_id.clone(), thread_cache.clone())),
438 )
439 }),
440 )
441 .await?
442 .into_iter()
443 .flatten()
445 {
446 let new_thread_summary =
447 thread_cache.state().read().await?.compute_thread_summary().await?;
448
449 all_caches.room.update_thread_summary(&thread_id, new_thread_summary).await?;
450 }
451 }
452
453 if let Some(pinned_events_cache) = all_caches.pinned_events.get() {
455 pinned_events_cache.replace_utds(&events).await?;
456 }
457
458 {
460 try_join_all(
465 all_caches
466 .event_focused
467 .read()
468 .await
469 .values()
470 .map(|event_focused_cache| event_focused_cache.replace_utds(&events)),
471 )
472 .await?;
473 }
474
475 let report =
476 RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids };
477 let _ = self.inner.redecryption_channels.utd_reporter.send(report);
478
479 Ok(())
480 }
481
482 async fn decrypt_event(
484 &self,
485 room_id: &RoomId,
486 room: Option<&Room>,
487 push_context: Option<&PushContext>,
488 event: &Raw<EncryptedEvent>,
489 ) -> Option<(DecryptedRoomEvent, Option<Vec<Action>>)> {
490 if let Some(room) = room {
491 match room
492 .decrypt_event(
493 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
494 push_context,
495 )
496 .await
497 {
498 Ok(maybe_decrypted) => {
499 let actions = maybe_decrypted.push_actions().map(|a| a.to_vec());
500
501 if let TimelineEventKind::Decrypted(decrypted) = maybe_decrypted.kind {
502 Some((decrypted, actions))
503 } else {
504 warn!(
505 "Failed to redecrypt an event despite receiving a room key or request to redecrypt"
506 );
507 None
508 }
509 }
510 Err(e) => {
511 warn!(
512 "Failed to redecrypt an event despite receiving a room key or request to redecrypt {e:?}"
513 );
514 None
515 }
516 }
517 } else {
518 let client = self.inner.client().ok()?;
519 let machine = client.olm_machine().await;
520 let machine = machine.as_ref()?;
521
522 match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await {
523 Ok(decrypted) => Some((decrypted, None)),
524 Err(e) => {
525 warn!(
526 "Failed to redecrypt an event despite receiving a room key or a request to redecrypt {e:?}"
527 );
528 None
529 }
530 }
531 }
532 }
533
534 #[instrument(skip_all, fields(room_id, session_id))]
537 async fn retry_decryption(
538 &self,
539 room_id: &RoomId,
540 session_id: SessionId<'_>,
541 ) -> Result<(), EventCacheError> {
542 let events = self.all_encrypted_events(room_id, session_id).await?;
544 self.retry_decryption_for_events(room_id, events).await
545 }
546
547 #[instrument(skip_all, fields(updates.linked_chunk_id))]
549 async fn retry_decryption_for_event_cache_updates(
550 &self,
551 updates: RoomEventCacheLinkedChunkUpdate,
552 ) -> Result<(), EventCacheError> {
553 let room_id = updates.linked_chunk_id.room_id();
554 let events: Vec<_> = updates
555 .updates
556 .into_iter()
557 .flat_map(|updates| updates.into_items())
558 .filter_map(filter_timeline_event_to_utd)
559 .collect();
560
561 self.retry_decryption_for_events(room_id, events).await
562 }
563
564 async fn retry_decryption_for_in_memory_events(&self) {
565 let utds = self.all_in_memory_encrypted_events().await;
566
567 for (room_id, utds) in utds.into_iter() {
568 if let Err(e) = self.retry_decryption_for_events(&room_id, utds).await {
569 warn!(%room_id, "Failed to redecrypt in-memory events {e:?}");
570 }
571 }
572 }
573
574 #[instrument(skip_all, fields(room_id, session_id))]
576 async fn retry_decryption_for_events(
577 &self,
578 room_id: &RoomId,
579 events: Vec<EventIdAndUtd>,
580 ) -> Result<(), EventCacheError> {
581 trace!("Retrying to decrypt");
582
583 if events.is_empty() {
584 trace!("No relevant events found.");
585 return Ok(());
586 }
587
588 let room = self.inner.client().ok().and_then(|client| client.get_room(room_id));
589 let push_context =
590 if let Some(room) = &room { room.push_context().await.ok().flatten() } else { None };
591
592 let mut decrypted_events = Vec::with_capacity(events.len());
594
595 for (event_id, event) in events {
596 if let Some((decrypted, actions)) = self
599 .decrypt_event(
600 room_id,
601 room.as_ref(),
602 push_context.as_ref(),
603 event.cast_ref_unchecked(),
604 )
605 .await
606 {
607 decrypted_events.push((event_id, decrypted, actions));
608 }
609 }
610
611 let event_ids: BTreeSet<_> =
612 decrypted_events.iter().map(|(event_id, _, _)| event_id).collect();
613
614 if !event_ids.is_empty() {
615 trace!(?event_ids, "Successfully redecrypted events");
616 }
617
618 self.on_resolved_utds(room_id, decrypted_events).await?;
621
622 Ok(())
623 }
624
625 async fn update_encryption_info_for_events(
627 &self,
628 room: &Room,
629 events: Vec<EventIdAndEvent>,
630 ) -> Result<(), EventCacheError> {
631 let mut updated_events = Vec::with_capacity(events.len());
633
634 for (event_id, mut event) in events {
635 if let Some(session_id) = event.encryption_info.session_id() {
636 let new_encryption_info =
637 room.get_encryption_info(session_id, &event.encryption_info.sender).await;
638
639 if let Some(new_encryption_info) = new_encryption_info
641 && event.encryption_info != new_encryption_info
642 {
643 event.encryption_info = new_encryption_info;
644 updated_events.push((event_id, event, None));
645 }
646 }
647 }
648
649 let event_ids: BTreeSet<_> =
650 updated_events.iter().map(|(event_id, _, _)| event_id).collect();
651
652 if !event_ids.is_empty() {
653 trace!(?event_ids, "Replacing the encryption info of some events");
654 }
655
656 self.on_resolved_utds(room.room_id(), updated_events).await
657 }
658
659 #[instrument(skip_all, fields(room_id, session_id))]
660 async fn update_encryption_info(
661 &self,
662 room_id: &RoomId,
663 session_id: SessionId<'_>,
664 ) -> Result<(), EventCacheError> {
665 trace!("Updating encryption info");
666
667 let Ok(client) = self.inner.client() else {
668 return Ok(());
669 };
670
671 let Some(room) = client.get_room(room_id) else {
672 return Ok(());
673 };
674
675 let events = self.all_decrypted_events(room_id, session_id).await?;
677
678 if events.is_empty() {
679 trace!("No relevant events found.");
680 return Ok(());
681 }
682
683 self.update_encryption_info_for_events(&room, events).await
685 }
686
687 async fn retry_update_encryption_info_for_in_memory_events(&self) {
688 let decrypted_events = self.all_in_memory_decrypted_events().await;
689
690 for (room_id, events) in decrypted_events.into_iter() {
691 let Some(room) = self.inner.client().ok().and_then(|c| c.get_room(&room_id)) else {
692 continue;
693 };
694
695 if let Err(e) = self.update_encryption_info_for_events(&room, events).await {
696 warn!(
697 %room_id,
698 "Failed to replace the encryption info for in-memory events {e:?}"
699 );
700 }
701 }
702 }
703
704 async fn retry_in_memory_events(&self) {
715 self.retry_decryption_for_in_memory_events().await;
716 self.retry_update_encryption_info_for_in_memory_events().await;
717 }
718
719 pub fn request_decryption(&self, request: DecryptionRetryRequest) {
760 let _ =
761 self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err(
762 |_| warn!("Requesting a decryption while the redecryption task has been shut down"),
763 );
764 }
765
766 pub fn subscribe_to_decryption_reports(
817 &self,
818 ) -> impl Stream<Item = Result<RedecryptorReport, BroadcastStreamRecvError>> {
819 BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe())
820 }
821}
822
823#[inline(always)]
824fn upgrade_event_cache(cache: &Weak<EventCacheInner>) -> Option<EventCache> {
825 cache.upgrade().map(|inner| EventCache { inner })
826}
827
828async fn send_report_and_retry_memory_events(
829 cache: &Weak<EventCacheInner>,
830 report: RedecryptorReport,
831) -> Result<(), ()> {
832 let Some(cache) = upgrade_event_cache(cache) else {
833 return Err(());
834 };
835
836 cache.retry_in_memory_events().await;
837 let _ = cache.inner.redecryption_channels.utd_reporter.send(report);
838
839 Ok(())
840}
841
842pub(crate) struct Redecryptor {
849 _task: BackgroundTaskHandle,
850}
851
852impl Redecryptor {
853 pub(super) fn new(
858 client: &Client,
859 cache: Weak<EventCacheInner>,
860 receiver: UnboundedReceiver<DecryptionRetryRequest>,
861 linked_chunk_update_sender: &Sender<RoomEventCacheLinkedChunkUpdate>,
862 ) -> Self {
863 let linked_chunk_stream = BroadcastStream::new(linked_chunk_update_sender.subscribe());
864 let backup_state_stream = client.encryption().backups().state_stream();
865
866 let task = client
867 .task_monitor()
868 .spawn_infinite_task("event_cache::redecryptor", async {
869 let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
870
871 Self::listen_for_room_keys_task(
872 cache,
873 request_redecryption_stream,
874 linked_chunk_stream,
875 backup_state_stream,
876 )
877 .await;
878 })
879 .abort_on_drop();
880
881 Self { _task: task }
882 }
883
884 async fn subscribe_to_room_key_stream(
889 cache: &Weak<EventCacheInner>,
890 ) -> Option<(
891 impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>,
892 impl Stream<Item = Vec<RoomKeyWithheldInfo>>,
893 )> {
894 let event_cache = cache.upgrade()?;
895 let client = event_cache.client().ok()?;
896 let machine = client.olm_machine().await;
897
898 machine.as_ref().map(|m| {
899 (m.store().room_keys_received_stream(), m.store().room_keys_withheld_received_stream())
900 })
901 }
902
903 async fn redecryption_loop(
904 cache: &Weak<EventCacheInner>,
905 decryption_request_stream: &mut Pin<&mut impl Stream<Item = DecryptionRetryRequest>>,
906 events_stream: &mut Pin<
907 &mut impl Stream<Item = Result<RoomEventCacheLinkedChunkUpdate, BroadcastStreamRecvError>>,
908 >,
909 backup_state_stream: &mut Pin<
910 &mut impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
911 >,
912 ) -> bool {
913 let Some((room_key_stream, withheld_stream)) =
914 Self::subscribe_to_room_key_stream(cache).await
915 else {
916 return false;
917 };
918
919 pin_mut!(room_key_stream);
920 pin_mut!(withheld_stream);
921
922 loop {
923 tokio::select! {
924 Some(request) = decryption_request_stream.next() => {
927 let Some(cache) = upgrade_event_cache(cache) else {
928 break false;
929 };
930
931 trace!(?request, "Received a redecryption request");
932
933 for session_id in request.utd_session_ids {
934 let _ = cache
935 .retry_decryption(&request.room_id, &session_id)
936 .await
937 .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}"));
938 }
939
940 for session_id in request.refresh_info_session_ids {
941 let _ = cache.update_encryption_info(&request.room_id, &session_id).await.inspect_err(|e|
942 warn!(
943 room_id = %request.room_id,
944 session_id = session_id,
945 "Unable to update the encryption info {e:?}",
946 ));
947 }
948 }
949 room_keys = room_key_stream.next() => {
952 match room_keys {
953 Some(Ok(room_keys)) => {
954 let Some(cache) = upgrade_event_cache(cache) else {
958 break false;
959 };
960
961 trace!(?room_keys, "Received new room keys");
962
963 for key in &room_keys {
964 let _ = cache
965 .retry_decryption(&key.room_id, &key.session_id)
966 .await
967 .inspect_err(|e| warn!("Error redecrypting {e:?}"));
968 }
969
970 for key in room_keys {
971 let _ = cache.update_encryption_info(&key.room_id, &key.session_id).await.inspect_err(|e|
972 warn!(
973 room_id = %key.room_id,
974 session_id = key.session_id,
975 "Unable to update the encryption info {e:?}",
976 ));
977 }
978 },
979 Some(Err(_)) => {
980 warn!("The room key stream lagged, reporting the lag to our listeners");
987
988 if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
989 break false;
990 }
991 },
992 None => {
995 break true;
996 }
997 }
998 }
999 withheld_info = withheld_stream.next() => {
1000 match withheld_info {
1001 Some(infos) => {
1002 let Some(cache) = upgrade_event_cache(cache) else {
1003 break false;
1004 };
1005
1006 trace!(?infos, "Received new withheld infos");
1007
1008 for RoomKeyWithheldInfo { room_id, session_id, .. } in &infos {
1009 let _ = cache.update_encryption_info(room_id, session_id).await.inspect_err(|e|
1010 warn!(
1011 room_id = %room_id,
1012 session_id = session_id,
1013 "Unable to update the encryption info {e:?}",
1014 ));
1015 }
1016 }
1017 None => break true,
1020 }
1021 }
1022 Some(event_updates) = events_stream.next() => {
1026 match event_updates {
1027 Ok(updates) => {
1028 let Some(cache) = upgrade_event_cache(cache) else {
1029 break false;
1030 };
1031
1032 let linked_chunk_id = updates.linked_chunk_id.to_owned();
1033
1034 let _ = cache.retry_decryption_for_event_cache_updates(updates).await.inspect_err(|e|
1035 warn!(
1036 %linked_chunk_id,
1037 "Unable to handle UTDs from event cache updates {e:?}",
1038 )
1039 );
1040 }
1041 Err(_) => {
1042 if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1043 break false;
1044 }
1045 }
1046 }
1047 }
1048 Some(backup_state_update) = backup_state_stream.next() => {
1049 match backup_state_update {
1050 Ok(state) => {
1051 match state {
1052 BackupState::Unknown |
1053 BackupState::Creating |
1054 BackupState::Enabling |
1055 BackupState::Resuming |
1056 BackupState::Downloading |
1057 BackupState::Disabling =>{
1058 }
1061 BackupState::Enabled => {
1062 if send_report_and_retry_memory_events(cache, RedecryptorReport::BackupAvailable).await.is_err() {
1067 break false;
1068 }
1069 }
1070 }
1071 }
1072 Err(_) => {
1073 if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1074 break false;
1075 }
1076 }
1077 }
1078 }
1079 else => break false,
1080 }
1081 }
1082 }
1083
1084 async fn listen_for_room_keys_task(
1085 cache: Weak<EventCacheInner>,
1086 decryption_request_stream: UnboundedReceiverStream<DecryptionRetryRequest>,
1087 events_stream: BroadcastStream<RoomEventCacheLinkedChunkUpdate>,
1088 backup_state_stream: impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
1089 ) {
1090 pin_mut!(decryption_request_stream);
1094 pin_mut!(events_stream);
1095 pin_mut!(backup_state_stream);
1096
1097 while Self::redecryption_loop(
1098 &cache,
1099 &mut decryption_request_stream,
1100 &mut events_stream,
1101 &mut backup_state_stream,
1102 )
1103 .await
1104 {
1105 info!("Regenerating the re-decryption streams");
1106
1107 if send_report_and_retry_memory_events(&cache, RedecryptorReport::Lagging)
1110 .await
1111 .is_err()
1112 {
1113 break;
1114 }
1115 }
1116
1117 info!("Shutting down the event cache redecryptor");
1118 }
1119}
1120
1121#[cfg(not(target_family = "wasm"))]
1122#[cfg(test)]
1123mod tests {
1124 use std::{
1125 collections::BTreeSet,
1126 sync::{
1127 Arc,
1128 atomic::{AtomicBool, Ordering},
1129 },
1130 time::Duration,
1131 };
1132
1133 use assert_matches2::assert_matches;
1134 use async_trait::async_trait;
1135 use eyeball_im::VectorDiff;
1136 use matrix_sdk_base::{
1137 cross_process_lock::CrossProcessLockGeneration,
1138 crypto::types::events::{ToDeviceEvent, room::encrypted::ToDeviceEncryptedEventContent},
1139 deserialized_responses::{TimelineEventKind, VerificationState},
1140 event_cache::{
1141 Event, Gap,
1142 store::{EventCacheStore, EventCacheStoreError, MemoryStore},
1143 },
1144 linked_chunk::{
1145 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
1146 RawChunk, Update,
1147 },
1148 locks::Mutex,
1149 sleep::sleep,
1150 store::StoreConfig,
1151 };
1152 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
1153 use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
1154 use ruma::{
1155 EventId, OwnedEventId, RoomId, RoomVersionId, device_id, event_id,
1156 events::{AnySyncTimelineEvent, relation::RelationType},
1157 room_id,
1158 serde::Raw,
1159 user_id,
1160 };
1161 use serde_json::json;
1162 use tokio::sync::oneshot::{self, Sender};
1163 use tracing::{Instrument, info};
1164
1165 use crate::{
1166 Client, assert_let_timeout,
1167 encryption::EncryptionSettings,
1168 event_cache::{
1169 DecryptionRetryRequest, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
1170 TimelineVectorDiffs,
1171 },
1172 test_utils::mocks::MatrixMockServer,
1173 };
1174
1175 #[derive(Debug, Clone)]
1180 struct DelayingStore {
1181 memory_store: MemoryStore,
1182 delaying: Arc<AtomicBool>,
1183 foo: Arc<Mutex<Option<Sender<()>>>>,
1184 }
1185
1186 impl DelayingStore {
1187 fn new() -> Self {
1188 Self {
1189 memory_store: MemoryStore::new(),
1190 delaying: AtomicBool::new(true).into(),
1191 foo: Arc::new(Mutex::new(None)),
1192 }
1193 }
1194
1195 async fn stop_delaying(&self) {
1196 let (sender, receiver) = oneshot::channel();
1197
1198 {
1199 *self.foo.lock() = Some(sender);
1200 }
1201
1202 self.delaying.store(false, Ordering::SeqCst);
1203
1204 receiver.await.expect("We should be able to receive a response")
1205 }
1206 }
1207
1208 #[cfg_attr(target_family = "wasm", async_trait(?Send))]
1209 #[cfg_attr(not(target_family = "wasm"), async_trait)]
1210 impl EventCacheStore for DelayingStore {
1211 type Error = EventCacheStoreError;
1212
1213 async fn close(&self) -> Result<(), EventCacheStoreError> {
1214 self.memory_store.close().await
1215 }
1216
1217 async fn reopen(&self) -> Result<(), EventCacheStoreError> {
1218 self.memory_store.reopen().await
1219 }
1220
1221 async fn try_take_leased_lock(
1222 &self,
1223 lease_duration_ms: u32,
1224 key: &str,
1225 holder: &str,
1226 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
1227 self.memory_store.try_take_leased_lock(lease_duration_ms, key, holder).await
1228 }
1229
1230 async fn handle_linked_chunk_updates(
1231 &self,
1232 linked_chunk_id: LinkedChunkId<'_>,
1233 updates: Vec<Update<Event, Gap>>,
1234 ) -> Result<(), Self::Error> {
1235 while self.delaying.load(Ordering::SeqCst) {
1241 sleep(Duration::from_millis(10)).await;
1242 }
1243
1244 let sender = self.foo.lock().take();
1245 let ret = self.memory_store.handle_linked_chunk_updates(linked_chunk_id, updates).await;
1246
1247 if let Some(sender) = sender {
1248 sender.send(()).expect("We should be able to notify the other side that we're done with the storage operation");
1249 }
1250
1251 ret
1252 }
1253
1254 async fn load_all_chunks(
1255 &self,
1256 linked_chunk_id: LinkedChunkId<'_>,
1257 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
1258 self.memory_store.load_all_chunks(linked_chunk_id).await
1259 }
1260
1261 async fn load_all_chunks_metadata(
1262 &self,
1263 linked_chunk_id: LinkedChunkId<'_>,
1264 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
1265 self.memory_store.load_all_chunks_metadata(linked_chunk_id).await
1266 }
1267
1268 async fn load_last_chunk(
1269 &self,
1270 linked_chunk_id: LinkedChunkId<'_>,
1271 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1272 self.memory_store.load_last_chunk(linked_chunk_id).await
1273 }
1274
1275 async fn load_previous_chunk(
1276 &self,
1277 linked_chunk_id: LinkedChunkId<'_>,
1278 before_chunk_identifier: ChunkIdentifier,
1279 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1280 self.memory_store.load_previous_chunk(linked_chunk_id, before_chunk_identifier).await
1281 }
1282
1283 async fn clear_all_events(&self) -> Result<(), Self::Error> {
1284 self.memory_store.clear_all_events().await
1285 }
1286
1287 async fn filter_duplicated_events(
1288 &self,
1289 linked_chunk_id: LinkedChunkId<'_>,
1290 events: Vec<OwnedEventId>,
1291 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1292 self.memory_store.filter_duplicated_events(linked_chunk_id, events).await
1293 }
1294
1295 async fn find_event(
1296 &self,
1297 room_id: &RoomId,
1298 event_id: &EventId,
1299 ) -> Result<Option<Event>, Self::Error> {
1300 self.memory_store.find_event(room_id, event_id).await
1301 }
1302
1303 async fn find_event_relations(
1304 &self,
1305 room_id: &RoomId,
1306 event_id: &EventId,
1307 filters: Option<&[RelationType]>,
1308 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1309 self.memory_store.find_event_relations(room_id, event_id, filters).await
1310 }
1311
1312 async fn get_room_events(
1313 &self,
1314 room_id: &RoomId,
1315 event_type: Option<&str>,
1316 session_id: Option<&str>,
1317 ) -> Result<Vec<Event>, Self::Error> {
1318 self.memory_store.get_room_events(room_id, event_type, session_id).await
1319 }
1320
1321 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1322 self.memory_store.save_event(room_id, event).await
1323 }
1324
1325 async fn optimize(&self) -> Result<(), Self::Error> {
1326 self.memory_store.optimize().await
1327 }
1328
1329 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1330 self.memory_store.get_size().await
1331 }
1332 }
1333
1334 async fn set_up_clients(
1335 room_id: &RoomId,
1336 alice_enables_cross_signing: bool,
1337 use_delayed_store: bool,
1338 ) -> (Client, Client, MatrixMockServer, Option<DelayingStore>) {
1339 let alice_span = tracing::info_span!("alice");
1340 let bob_span = tracing::info_span!("bob");
1341
1342 let alice_user_id = user_id!("@alice:localhost");
1343 let alice_device_id = device_id!("ALICEDEVICE");
1344 let bob_user_id = user_id!("@bob:localhost");
1345 let bob_device_id = device_id!("BOBDEVICE");
1346
1347 let matrix_mock_server = MatrixMockServer::new().await;
1348 matrix_mock_server.mock_crypto_endpoints_preset().await;
1349
1350 let encryption_settings = EncryptionSettings {
1351 auto_enable_cross_signing: alice_enables_cross_signing,
1352 ..Default::default()
1353 };
1354
1355 let alice = matrix_mock_server
1358 .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id)
1359 .on_builder(|builder| {
1360 builder
1361 .with_enable_share_history_on_invite(true)
1362 .with_encryption_settings(encryption_settings)
1363 })
1364 .build()
1365 .instrument(alice_span.clone())
1366 .await;
1367
1368 let encryption_settings =
1369 EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() };
1370
1371 let (store_config, store) = if use_delayed_store {
1372 let store = DelayingStore::new();
1373
1374 (
1375 StoreConfig::new(CrossProcessLockConfig::multi_process(
1376 "delayed_store_event_cache_test",
1377 ))
1378 .event_cache_store(store.clone()),
1379 Some(store),
1380 )
1381 } else {
1382 (
1383 StoreConfig::new(CrossProcessLockConfig::multi_process(
1384 "normal_store_event_cache_test",
1385 )),
1386 None,
1387 )
1388 };
1389
1390 let bob = matrix_mock_server
1391 .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id)
1392 .on_builder(|builder| {
1393 builder
1394 .with_enable_share_history_on_invite(true)
1395 .with_encryption_settings(encryption_settings)
1396 .store_config(store_config)
1397 })
1398 .build()
1399 .instrument(bob_span.clone())
1400 .await;
1401
1402 bob.event_cache().subscribe().expect("Bob should be able to enable the event cache");
1403
1404 matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await;
1406
1407 let event_factory = EventFactory::new().room(room_id).sender(alice_user_id);
1408
1409 let room_builder = JoinedRoomBuilder::new(room_id)
1411 .add_state_event(event_factory.create(alice_user_id, RoomVersionId::V1))
1412 .add_state_event(event_factory.room_encryption());
1413
1414 matrix_mock_server
1415 .mock_sync()
1416 .ok_and_run(&alice, |builder| {
1417 builder.add_joined_room(room_builder.clone());
1418 })
1419 .instrument(alice_span)
1420 .await;
1421
1422 matrix_mock_server
1423 .mock_sync()
1424 .ok_and_run(&bob, |builder| {
1425 builder.add_joined_room(room_builder);
1426 })
1427 .instrument(bob_span)
1428 .await;
1429
1430 (alice, bob, matrix_mock_server, store)
1431 }
1432
1433 async fn prepare_room(
1434 matrix_mock_server: &MatrixMockServer,
1435 event_factory: &EventFactory,
1436 alice: &Client,
1437 bob: &Client,
1438 room_id: &RoomId,
1439 ) -> (Raw<AnySyncTimelineEvent>, Raw<ToDeviceEvent<ToDeviceEncryptedEventContent>>) {
1440 let alice_user_id = alice.user_id().unwrap();
1441 let bob_user_id = bob.user_id().unwrap();
1442
1443 let alice_member_event = event_factory.member(alice_user_id).into_raw();
1444 let bob_member_event = event_factory.member(bob_user_id).into_raw();
1445
1446 let room = alice
1447 .get_room(room_id)
1448 .expect("Alice should have access to the room now that we synced");
1449
1450 let event_type = "m.room.message";
1455 let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"});
1456
1457 let event_id = event_id!("$some_id");
1458 let (event_receiver, mock) =
1459 matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id);
1460 let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await;
1461
1462 {
1463 let _guard = mock.mock_once().mount_as_scoped().await;
1464
1465 matrix_mock_server
1466 .mock_get_members()
1467 .ok(vec![alice_member_event.clone(), bob_member_event.clone()])
1468 .mock_once()
1469 .mount()
1470 .await;
1471
1472 room.send_raw(event_type, content)
1473 .await
1474 .expect("We should be able to send an initial message");
1475 };
1476
1477 let event = event_receiver.await.expect("Alice should have sent the event by now");
1479 let room_key = room_key.await;
1480
1481 (event, room_key)
1482 }
1483
1484 #[async_test]
1485 async fn test_redecryptor() {
1486 let room_id = room_id!("!test:localhost");
1487
1488 let event_factory = EventFactory::new().room(room_id);
1489 let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, true, false).await;
1490
1491 let (event, room_key) =
1492 prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1493
1494 let event_cache = bob.event_cache();
1497 let (room_cache, _) = event_cache
1498 .room(room_id)
1499 .await
1500 .expect("We should be able to get to the event cache for a specific room");
1501
1502 let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1503 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1504
1505 bob.inner
1508 .base_client
1509 .regenerate_olm(None)
1510 .await
1511 .expect("We should be able to regenerate the Olm machine");
1512
1513 matrix_mock_server
1515 .mock_sync()
1516 .ok_and_run(&bob, |builder| {
1517 builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1518 })
1519 .await;
1520
1521 assert_let_timeout!(
1524 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1525 subscriber.recv()
1526 );
1527
1528 assert_eq!(diffs.len(), 1);
1531 assert_matches!(&diffs[0], VectorDiff::Append { values });
1532 assert_eq!(values.len(), 1);
1533 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1534
1535 assert_let_timeout!(
1536 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1537 );
1538 assert_eq!(expected_room_id, room_id);
1539 assert!(generic_stream.is_empty());
1540
1541 matrix_mock_server
1543 .mock_sync()
1544 .ok_and_run(&bob, |builder| {
1545 builder.add_to_device_event(
1546 room_key
1547 .deserialize_as()
1548 .expect("We should be able to deserialize the room key"),
1549 );
1550 })
1551 .await;
1552
1553 assert_let_timeout!(
1555 Duration::from_secs(1),
1556 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1557 subscriber.recv()
1558 );
1559
1560 assert_eq!(diffs.len(), 1);
1562 assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1563 assert_eq!(*index, 0);
1564 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1565
1566 assert_let_timeout!(
1567 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1568 );
1569 assert_eq!(expected_room_id, room_id);
1570 assert!(generic_stream.is_empty());
1571 }
1572
1573 #[async_test]
1574 async fn test_redecryptor_updating_encryption_info() {
1575 let bob_span = tracing::info_span!("bob");
1576
1577 let room_id = room_id!("!test:localhost");
1578
1579 let event_factory = EventFactory::new().room(room_id);
1580 let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, false, false).await;
1581
1582 let (event, room_key) =
1583 prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1584
1585 let event_cache = bob.event_cache();
1588 let (room_cache, _) = event_cache
1589 .room(room_id)
1590 .instrument(bob_span.clone())
1591 .await
1592 .expect("We should be able to get to the event cache for a specific room");
1593
1594 let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1595 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1596
1597 matrix_mock_server
1599 .mock_sync()
1600 .ok_and_run(&bob, |builder| {
1601 builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1602 })
1603 .instrument(bob_span.clone())
1604 .await;
1605
1606 assert_let_timeout!(
1609 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1610 subscriber.recv()
1611 );
1612
1613 assert_eq!(diffs.len(), 1);
1616 assert_matches!(&diffs[0], VectorDiff::Append { values });
1617 assert_eq!(values.len(), 1);
1618 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1619
1620 assert_let_timeout!(
1621 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1622 );
1623 assert_eq!(expected_room_id, room_id);
1624 assert!(generic_stream.is_empty());
1625
1626 matrix_mock_server
1628 .mock_sync()
1629 .ok_and_run(&bob, |builder| {
1630 builder.add_to_device_event(
1631 room_key
1632 .deserialize_as()
1633 .expect("We should be able to deserialize the room key"),
1634 );
1635 })
1636 .instrument(bob_span.clone())
1637 .await;
1638
1639 assert_let_timeout!(
1641 Duration::from_secs(1),
1642 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1643 subscriber.recv()
1644 );
1645
1646 assert_eq!(diffs.len(), 1);
1648 assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1649 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1650
1651 let encryption_info = value.encryption_info().unwrap();
1652 assert_matches!(&encryption_info.verification_state, VerificationState::Unverified(_));
1653
1654 assert_let_timeout!(
1655 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1656 );
1657 assert_eq!(expected_room_id, room_id);
1658 assert!(generic_stream.is_empty());
1659
1660 let session_id = encryption_info.session_id().unwrap().to_owned();
1661 let alice_user_id = alice.user_id().unwrap();
1662
1663 alice
1665 .encryption()
1666 .bootstrap_cross_signing(None)
1667 .await
1668 .expect("Alice should be able to create the cross-signing keys");
1669
1670 bob.update_tracked_users_for_testing([alice_user_id]).instrument(bob_span.clone()).await;
1671 matrix_mock_server
1672 .mock_sync()
1673 .ok_and_run(&bob, |builder| {
1674 builder.add_change_device(alice_user_id);
1675 })
1676 .instrument(bob_span.clone())
1677 .await;
1678
1679 bob.event_cache().request_decryption(DecryptionRetryRequest {
1680 room_id: room_id.into(),
1681 utd_session_ids: BTreeSet::new(),
1682 refresh_info_session_ids: BTreeSet::from([session_id]),
1683 });
1684
1685 assert_let_timeout!(
1688 Duration::from_secs(1),
1689 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1690 subscriber.recv()
1691 );
1692
1693 assert_eq!(diffs.len(), 1);
1694 assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1695 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1696 let encryption_info = value.encryption_info().unwrap();
1697
1698 assert_matches!(
1699 &encryption_info.verification_state,
1700 VerificationState::Unverified(_),
1701 "The event should now know about the identity but still be unverified"
1702 );
1703
1704 assert_let_timeout!(
1705 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1706 );
1707 assert_eq!(expected_room_id, room_id);
1708 assert!(generic_stream.is_empty());
1709 }
1710
1711 #[async_test]
1712 async fn test_event_is_redecrypted_even_if_key_arrives_while_event_processing() {
1713 let room_id = room_id!("!test:localhost");
1714
1715 let event_factory = EventFactory::new().room(room_id);
1716 let (alice, bob, matrix_mock_server, delayed_store) =
1717 set_up_clients(room_id, true, true).await;
1718
1719 let delayed_store = delayed_store.unwrap();
1720
1721 let (event, room_key) =
1722 prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1723
1724 let event_cache = bob.event_cache();
1725
1726 let (room_cache, _) = event_cache
1728 .room(room_id)
1729 .await
1730 .expect("We should be able to get to the event cache for a specific room");
1731
1732 let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1733 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1734
1735 matrix_mock_server
1737 .mock_sync()
1738 .ok_and_run(&bob, |builder| {
1739 builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1740 })
1741 .await;
1742
1743 matrix_mock_server
1745 .mock_sync()
1746 .ok_and_run(&bob, |builder| {
1747 builder.add_to_device_event(
1748 room_key
1749 .deserialize_as()
1750 .expect("We should be able to deserialize the room key"),
1751 );
1752 })
1753 .await;
1754
1755 info!("Stopping the delay");
1756 delayed_store.stop_delaying().await;
1757
1758 assert_let_timeout!(
1764 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1765 subscriber.recv()
1766 );
1767
1768 assert_eq!(diffs.len(), 1);
1771 assert_matches!(&diffs[0], VectorDiff::Append { values });
1772 assert_eq!(values.len(), 1);
1773 assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1774
1775 assert_let_timeout!(
1777 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1778 );
1779 assert_eq!(expected_room_id, room_id);
1780
1781 assert_let_timeout!(
1783 Duration::from_secs(1),
1784 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1785 subscriber.recv()
1786 );
1787
1788 assert_eq!(diffs.len(), 1);
1790 assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1791 assert_eq!(*index, 0);
1792 assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1793
1794 assert_let_timeout!(
1796 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1797 );
1798 assert_eq!(expected_room_id, room_id);
1799 assert!(generic_stream.is_empty());
1800 }
1801}