Skip to main content

matrix_sdk/event_cache/
redecryptor.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The Redecryptor (affectionately known as R2D2) is a layer and long-running
16//! background task which handles redecryption of events in case we couldn't
17//! decrypt them immediately.
18//!
19//! There are various reasons why a room key might not be available immediately
20//! when the event becomes available:
21//!     - The to-device message containing the room key just arrives late, i.e.
22//!       after the room event.
23//!     - The event is a historic event and we need to first download the room
24//!       key from the backup.
25//!     - The event is a historic event in a previously unjoined room, we need
26//!       to receive historic room keys as defined in [MSC3061].
27//!
28//! R2D2 listens to the [`OlmMachine`] for received room keys and new
29//! m.room_key.withheld events.
30//!
31//! If a new room key has been received, it attempts to find any UTDs in the
32//! [`EventCache`]. If R2D2 decrypts any UTDs from the event cache, it will
33//! replace the events in the cache and send out new [`RoomEventCacheUpdate`]s
34//! to any of its listeners.
35//!
36//! If a new withheld info has been received, it attempts to find any relevant
37//! events and updates the [`EncryptionInfo`] of an event.
38//!
39//! There's an additional gotcha: the [`OlmMachine`] might get recreated by
40//! calls to [`BaseClient::regenerate_olm()`]. When this happens, we will
41//! receive a `None` on the room keys stream and we need to re-listen to it.
42//!
43//! Another gotcha is that room keys might be received on another process if the
44//! [`Client`] is operating on a Apple iOS device. A separate process is used
45//! in this case to receive push notifications. In this case, the room key will
46//! be received and R2D2 won't get notified about it. To work around this,
47//! decryption requests can be explicitly sent to R2D2.
48//!
49//! The final gotcha is that a room key might be received just in between the
50//! time the event was initially tried to be decrypted and the time it took to
51//! persist it in the event cache. To handle this race condition, R2D2 listens
52//! to the event cache and attempts to decrypt any UTDs the event cache
53//! persists.
54//!
55//! In the graph below, the Timeline block is meant to be the `Timeline` from
56//! the `matrix-sdk-ui` crate, but it could be any other listener that
57//! subscribes to [`RedecryptorReport`] stream.
58//!
59//! ```markdown
60//! 
61//!      .----------------------.
62//!     |                        |
63//!     |      Beeb, boop!       |
64//!     |                        .
65//!      ----------------------._ \
66//!                               -;  _____
67//!                                 .`/L|__`.
68//!                                / =[_]O|` \
69//!                                |"+_____":|
70//!                              __:='|____`-:__
71//!                             ||[] ||====|| []||
72//!                             ||[] ||====|| []||
73//!                             |:== ||====|| ==:|
74//!                             ||[] ||====|| []||
75//!                             ||[] ||====|| []||
76//!                            _||_  ||====||  _||_
77//!                           (====) |:====:| (====)
78//!                            }--{  | |  | |  }--{
79//!                           (____) |_|  |_| (____)
80//!
81//!                              ┌─────────────┐
82//!                              │             │
83//!                  ┌───────────┤   Timeline  │◄────────────┐
84//!                  │           │             │             │
85//!                  │           └──────▲──────┘             │
86//!                  │                  │                    │
87//!                  │                  │                    │
88//!                  │                  │                    │
89//!              Decryption             │                Redecryptor
90//!                request              │                  report
91//!                  │        RoomEventCacheUpdates          │
92//!                  │                  │                    │
93//!                  │                  │                    │
94//!                  │      ┌───────────┴───────────┐        │
95//!                  │      │                       │        │
96//!                  └──────►         R2D2          │────────┘
97//!                         │                       │
98//!                         └──▲─────────────────▲──┘
99//!                            │                 │
100//!                            │                 │
101//!                            │                 │
102//!                         Received        Received room
103//!                          events          keys stream
104//!                            │                 │
105//!                            │                 │
106//!                            │                 │
107//!                    ┌───────┴──────┐  ┌───────┴──────┐
108//!                    │              │  │              │
109//!                    │  Event Cache │  │  OlmMachine  │
110//!                    │              │  │              │
111//!                    └──────────────┘  └──────────────┘
112//! ```
113//!
114//! [MSC3061]: https://github.com/matrix-org/matrix-spec/pull/1655#issuecomment-2213152255
115
116use std::{
117    collections::{BTreeMap, BTreeSet},
118    pin::Pin,
119    sync::Weak,
120};
121
122use as_variant::as_variant;
123use futures_core::Stream;
124use futures_util::{StreamExt, future::join_all, pin_mut};
125#[cfg(doc)]
126use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
127use matrix_sdk_base::{
128    crypto::{
129        store::types::{RoomKeyInfo, RoomKeyWithheldInfo},
130        types::events::room::encrypted::EncryptedEvent,
131    },
132    deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind},
133    event_cache::store::EventCacheStoreLockState,
134    locks::Mutex,
135    task_monitor::BackgroundTaskHandle,
136    timer,
137};
138#[cfg(doc)]
139use matrix_sdk_common::deserialized_responses::EncryptionInfo;
140use ruma::{
141    OwnedEventId, OwnedRoomId, RoomId,
142    events::{AnySyncTimelineEvent, room::encrypted::OriginalSyncRoomEncryptedEvent},
143    push::Action,
144    serde::Raw,
145};
146use tokio::sync::{
147    broadcast::{self, Sender},
148    mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
149};
150use tokio_stream::wrappers::{
151    BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError,
152};
153use tracing::{info, instrument, trace, warn};
154
155#[cfg(doc)]
156use super::RoomEventCache;
157use super::{
158    EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheGenericUpdate,
159    RoomEventCacheUpdate, TimelineVectorDiffs,
160    caches::room::{PostProcessingOrigin, RoomEventCacheLinkedChunkUpdate},
161};
162use crate::{Client, Result, Room, encryption::backups::BackupState, room::PushContext};
163
164type SessionId<'a> = &'a str;
165type OwnedSessionId = String;
166
167type EventIdAndUtd = (OwnedEventId, Raw<AnySyncTimelineEvent>);
168type EventIdAndEvent = (OwnedEventId, DecryptedRoomEvent);
169pub(in crate::event_cache) type ResolvedUtd =
170    (OwnedEventId, DecryptedRoomEvent, Option<Vec<Action>>);
171
172/// The information sent across the channel to the long-running task requesting
173/// that the supplied set of sessions be retried.
174#[derive(Debug, Clone)]
175pub struct DecryptionRetryRequest {
176    /// The room ID of the room the events belong to.
177    pub room_id: OwnedRoomId,
178    /// Events that are not decrypted.
179    pub utd_session_ids: BTreeSet<OwnedSessionId>,
180    /// Events that are decrypted but might need to have their
181    /// [`EncryptionInfo`] refreshed.
182    pub refresh_info_session_ids: BTreeSet<OwnedSessionId>,
183}
184
185/// A report coming from the redecryptor.
186#[derive(Debug, Clone)]
187pub enum RedecryptorReport {
188    /// Events which we were able to decrypt.
189    ResolvedUtds {
190        /// The room ID of the room the events belong to.
191        room_id: OwnedRoomId,
192        /// The list of event IDs of the decrypted events.
193        events: BTreeSet<OwnedEventId>,
194    },
195    /// The redecryptor might have missed some room keys so it might not have
196    /// re-decrypted events that are now decryptable.
197    Lagging,
198    /// A room key backup has become available.
199    ///
200    /// This means that components might want to tell R2D2 about events they
201    /// care about to attempt a decryption.
202    BackupAvailable,
203}
204
205pub(super) struct RedecryptorChannels {
206    utd_reporter: Sender<RedecryptorReport>,
207    pub(super) decryption_request_sender: UnboundedSender<DecryptionRetryRequest>,
208    pub(super) decryption_request_receiver:
209        Mutex<Option<UnboundedReceiver<DecryptionRetryRequest>>>,
210}
211
212impl RedecryptorChannels {
213    pub(super) fn new() -> Self {
214        let (utd_reporter, _) = broadcast::channel(100);
215        let (decryption_request_sender, decryption_request_receiver) = unbounded_channel();
216
217        Self {
218            utd_reporter,
219            decryption_request_sender,
220            decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)),
221        }
222    }
223}
224
225/// A function which can be used to filter and map [`TimelineEvent`]s into a
226/// tuple of event ID and raw [`AnySyncTimelineEvent`].
227///
228/// The tuple can be used to attempt to redecrypt events.
229fn filter_timeline_event_to_utd(
230    event: TimelineEvent,
231) -> Option<(OwnedEventId, Raw<AnySyncTimelineEvent>)> {
232    let event_id = event.event_id();
233
234    // Only pick out events that are UTDs, get just the Raw event as this is what
235    // the OlmMachine needs.
236    let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event);
237    // Zip the event ID and event together so we don't have to pick out the event ID
238    // again. We need the event ID to replace the event in the cache.
239    event_id.zip(event)
240}
241
242/// A function which can be used to filter an map [`TimelineEvent`]s into a
243/// tuple of event ID and [`DecryptedRoomEvent`].
244///
245/// The tuple can be used to attempt to update the encryption info of the
246/// decrypted event.
247fn filter_timeline_event_to_decrypted(
248    event: TimelineEvent,
249) -> Option<(OwnedEventId, DecryptedRoomEvent)> {
250    let event_id = event.event_id();
251
252    let event = as_variant!(event.kind, TimelineEventKind::Decrypted(event) => event);
253    // Zip the event ID and event together so we don't have to pick out the event ID
254    // again. We need the event ID to replace the event in the cache.
255    event_id.zip(event)
256}
257
258impl EventCache {
259    /// Retrieve a set of events that we weren't able to decrypt.
260    ///
261    /// # Arguments
262    ///
263    /// * `room_id` - The ID of the room where the events were sent to.
264    /// * `session_id` - The unique ID of the room key that was used to encrypt
265    ///   the event.
266    async fn get_utds(
267        &self,
268        room_id: &RoomId,
269        session_id: SessionId<'_>,
270    ) -> Result<Vec<EventIdAndUtd>, EventCacheError> {
271        let events = match self.inner.store.lock().await? {
272            // If the lock is clean, no problem.
273            // If the lock is dirty, it doesn't really matter as we are hitting the store
274            // directly, there is no in-memory state to manage, so all good. Do not mark the lock as
275            // non-dirty.
276            EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
277                guard.get_room_events(room_id, Some("m.room.encrypted"), Some(session_id)).await?
278            }
279        };
280
281        Ok(events.into_iter().filter_map(filter_timeline_event_to_utd).collect())
282    }
283
284    /// Retrieve a set of events that we weren't able to decrypt from the memory
285    /// of the event cache.
286    async fn get_utds_from_memory(&self) -> BTreeMap<OwnedRoomId, Vec<EventIdAndUtd>> {
287        let mut utds = BTreeMap::new();
288
289        for (room_id, caches) in self.inner.by_room.read().await.iter() {
290            let room_utds: Vec<_> = caches
291                .all_events()
292                .await
293                .into_iter()
294                .flatten()
295                .filter_map(filter_timeline_event_to_utd)
296                .collect();
297
298            utds.insert(room_id.to_owned(), room_utds);
299        }
300
301        utds
302    }
303
304    async fn get_decrypted_events(
305        &self,
306        room_id: &RoomId,
307        session_id: SessionId<'_>,
308    ) -> Result<Vec<EventIdAndEvent>, EventCacheError> {
309        let events = match self.inner.store.lock().await? {
310            // If the lock is clean, no problem.
311            // If the lock is dirty, it doesn't really matter as we are hitting the store
312            // directly, there is no in-memory state to manage, so all good. Do not mark the lock as
313            // non-dirty.
314            EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
315                guard.get_room_events(room_id, None, Some(session_id)).await?
316            }
317        };
318
319        Ok(events.into_iter().filter_map(filter_timeline_event_to_decrypted).collect())
320    }
321
322    async fn get_decrypted_events_from_memory(
323        &self,
324    ) -> BTreeMap<OwnedRoomId, Vec<EventIdAndEvent>> {
325        let mut decrypted_events = BTreeMap::new();
326
327        for (room_id, caches) in self.inner.by_room.read().await.iter() {
328            let room_utds: Vec<_> = caches
329                .all_events()
330                .await
331                .into_iter()
332                .flatten()
333                .filter_map(filter_timeline_event_to_decrypted)
334                .collect();
335
336            decrypted_events.insert(room_id.to_owned(), room_utds);
337        }
338
339        decrypted_events
340    }
341
342    /// Handle a chunk of events that we were previously unable to decrypt but
343    /// have now successfully decrypted.
344    ///
345    /// This function will replace the existing UTD events in memory and the
346    /// store and send out a [`RoomEventCacheUpdate`] for the newly
347    /// decrypted events.
348    ///
349    /// # Arguments
350    ///
351    /// * `room_id` - The ID of the room where the events were sent to.
352    /// * `events` - A chunk of events that were successfully decrypted.
353    #[instrument(skip_all, fields(room_id))]
354    async fn on_resolved_utds(
355        &self,
356        room_id: &RoomId,
357        events: Vec<ResolvedUtd>,
358    ) -> Result<(), EventCacheError> {
359        if events.is_empty() {
360            trace!("No events were redecrypted or updated, nothing to replace");
361            return Ok(());
362        }
363
364        timer!("Resolving UTDs");
365
366        // Get the cache for this particular room.
367        let (room_cache, _drop_handles) = self.for_room(room_id).await?;
368
369        let event_ids: BTreeSet<_> =
370            events.iter().cloned().map(|(event_id, _, _)| event_id).collect();
371
372        // Phase 1: under the room state write lock, collect cache handles and
373        // perform all room-linked-chunk mutations. We deliberately do NOT call
374        // replace_utds() on event-focused/pinned caches here to avoid an ABBA deadlock:
375        // pagination holds an event-focused cache lock and then tries to acquire the
376        // room state lock (via `save_events`), while this method would hold the
377        // room state lock and try to acquire event-focused cache locks.
378        let (pinned_cache, ef_caches) = {
379            let mut state = room_cache.state().write().await?;
380
381            let pinned_cache = state.pinned_event_cache().cloned();
382            let ef_caches: Vec<_> = state.event_focused_caches().cloned().collect();
383
384            // Consider the room linked chunk.
385            let mut new_events = Vec::with_capacity(events.len());
386            for (event_id, decrypted, actions) in &events {
387                if let Some((location, mut target_event)) = state.find_event(event_id).await? {
388                    target_event.kind = TimelineEventKind::Decrypted(decrypted.clone());
389
390                    if let Some(actions) = actions {
391                        target_event.set_push_actions(actions.clone());
392                    }
393
394                    // TODO: `replace_event_at()` propagates changes to the store for every
395                    // event, we should probably have a bulk version of this?
396                    state.replace_event_at(location, target_event.clone()).await?;
397                    new_events.push(target_event);
398                }
399            }
400
401            // Read receipt events aren't encrypted, so we can't have decrypted a new
402            // one here. As a result, we don't have any new receipt events to
403            // post-process, so we can just pass `None` here.
404            //
405            // Note: read receipts may be updated anyhow in the post-processing step,
406            // as the redecryption may have decrypted some events that don't count as
407            // unreads.
408            let receipt_event = None;
409
410            state
411                .post_process_new_events(
412                    new_events,
413                    PostProcessingOrigin::Redecryption,
414                    receipt_event,
415                )
416                .await?;
417
418            room_cache.update_sender().send(
419                RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
420                    diffs: state.room_linked_chunk_mut().updates_as_vector_diffs(),
421                    origin: EventsOrigin::Cache,
422                }),
423                Some(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() }),
424            );
425
426            (pinned_cache, ef_caches)
427        };
428        // Room state write lock is dropped here.
429
430        // Phase 2: replace UTDs in pinned and event-focused caches WITHOUT
431        // holding the room state lock. These caches have their own internal
432        // locks and don't need the room state lock.
433        if let Some(pinned_cache) = pinned_cache {
434            pinned_cache.replace_utds(&events).await?;
435        }
436
437        // TODO: This ain't great for performance; there shouldn't be that many
438        // event-focused caches alive at the same time, but they could
439        // accumulate over time. Consider keeping track of which linked chunk
440        // contain which event id, to avoid doing the linear searches here.
441        join_all(ef_caches.iter().map(|cache| cache.replace_utds(&events))).await;
442
443        let report =
444            RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids };
445        let _ = self.inner.redecryption_channels.utd_reporter.send(report);
446
447        Ok(())
448    }
449
450    /// Attempt to decrypt a single event.
451    async fn decrypt_event(
452        &self,
453        room_id: &RoomId,
454        room: Option<&Room>,
455        push_context: Option<&PushContext>,
456        event: &Raw<EncryptedEvent>,
457    ) -> Option<(DecryptedRoomEvent, Option<Vec<Action>>)> {
458        if let Some(room) = room {
459            match room
460                .decrypt_event(
461                    event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
462                    push_context,
463                )
464                .await
465            {
466                Ok(maybe_decrypted) => {
467                    let actions = maybe_decrypted.push_actions().map(|a| a.to_vec());
468
469                    if let TimelineEventKind::Decrypted(decrypted) = maybe_decrypted.kind {
470                        Some((decrypted, actions))
471                    } else {
472                        warn!(
473                            "Failed to redecrypt an event despite receiving a room key or request to redecrypt"
474                        );
475                        None
476                    }
477                }
478                Err(e) => {
479                    warn!(
480                        "Failed to redecrypt an event despite receiving a room key or request to redecrypt {e:?}"
481                    );
482                    None
483                }
484            }
485        } else {
486            let client = self.inner.client().ok()?;
487            let machine = client.olm_machine().await;
488            let machine = machine.as_ref()?;
489
490            match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await {
491                Ok(decrypted) => Some((decrypted, None)),
492                Err(e) => {
493                    warn!(
494                        "Failed to redecrypt an event despite receiving a room key or a request to redecrypt {e:?}"
495                    );
496                    None
497                }
498            }
499        }
500    }
501
502    /// Attempt to redecrypt events after a room key with the given session ID
503    /// has been received.
504    #[instrument(skip_all, fields(room_id, session_id))]
505    async fn retry_decryption(
506        &self,
507        room_id: &RoomId,
508        session_id: SessionId<'_>,
509    ) -> Result<(), EventCacheError> {
510        // Get all the relevant UTDs.
511        let events = self.get_utds(room_id, session_id).await?;
512        self.retry_decryption_for_events(room_id, events).await
513    }
514
515    /// Attempt to redecrypt events that were persisted in the event cache.
516    #[instrument(skip_all, fields(updates.linked_chunk_id))]
517    async fn retry_decryption_for_event_cache_updates(
518        &self,
519        updates: RoomEventCacheLinkedChunkUpdate,
520    ) -> Result<(), EventCacheError> {
521        let room_id = updates.linked_chunk_id.room_id();
522        let events: Vec<_> = updates
523            .updates
524            .into_iter()
525            .flat_map(|updates| updates.into_items())
526            .filter_map(filter_timeline_event_to_utd)
527            .collect();
528
529        self.retry_decryption_for_events(room_id, events).await
530    }
531
532    async fn retry_decryption_for_in_memory_events(&self) {
533        let utds = self.get_utds_from_memory().await;
534
535        for (room_id, utds) in utds.into_iter() {
536            if let Err(e) = self.retry_decryption_for_events(&room_id, utds).await {
537                warn!(%room_id, "Failed to redecrypt in-memory events {e:?}");
538            }
539        }
540    }
541
542    /// Attempt to redecrypt a chunk of UTDs.
543    #[instrument(skip_all, fields(room_id, session_id))]
544    async fn retry_decryption_for_events(
545        &self,
546        room_id: &RoomId,
547        events: Vec<EventIdAndUtd>,
548    ) -> Result<(), EventCacheError> {
549        trace!("Retrying to decrypt");
550
551        if events.is_empty() {
552            trace!("No relevant events found.");
553            return Ok(());
554        }
555
556        let room = self.inner.client().ok().and_then(|client| client.get_room(room_id));
557        let push_context =
558            if let Some(room) = &room { room.push_context().await.ok().flatten() } else { None };
559
560        // Let's attempt to decrypt them them.
561        let mut decrypted_events = Vec::with_capacity(events.len());
562
563        for (event_id, event) in events {
564            // If we managed to decrypt the event, and we should have to since we received
565            // the room key for this specific event, then replace the event.
566            if let Some((decrypted, actions)) = self
567                .decrypt_event(
568                    room_id,
569                    room.as_ref(),
570                    push_context.as_ref(),
571                    event.cast_ref_unchecked(),
572                )
573                .await
574            {
575                decrypted_events.push((event_id, decrypted, actions));
576            }
577        }
578
579        let event_ids: BTreeSet<_> =
580            decrypted_events.iter().map(|(event_id, _, _)| event_id).collect();
581
582        if !event_ids.is_empty() {
583            trace!(?event_ids, "Successfully redecrypted events");
584        }
585
586        // Replace the events and notify listeners that UTDs have been replaced with
587        // decrypted events.
588        self.on_resolved_utds(room_id, decrypted_events).await?;
589
590        Ok(())
591    }
592
593    /// Attempt to update the encryption info for the given list of events.
594    async fn update_encryption_info_for_events(
595        &self,
596        room: &Room,
597        events: Vec<EventIdAndEvent>,
598    ) -> Result<(), EventCacheError> {
599        // Let's attempt to update their encryption info.
600        let mut updated_events = Vec::with_capacity(events.len());
601
602        for (event_id, mut event) in events {
603            if let Some(session_id) = event.encryption_info.session_id() {
604                let new_encryption_info =
605                    room.get_encryption_info(session_id, &event.encryption_info.sender).await;
606
607                // Only create a replacement if the encryption info actually changed.
608                if let Some(new_encryption_info) = new_encryption_info
609                    && event.encryption_info != new_encryption_info
610                {
611                    event.encryption_info = new_encryption_info;
612                    updated_events.push((event_id, event, None));
613                }
614            }
615        }
616
617        let event_ids: BTreeSet<_> =
618            updated_events.iter().map(|(event_id, _, _)| event_id).collect();
619
620        if !event_ids.is_empty() {
621            trace!(?event_ids, "Replacing the encryption info of some events");
622        }
623
624        self.on_resolved_utds(room.room_id(), updated_events).await
625    }
626
627    #[instrument(skip_all, fields(room_id, session_id))]
628    async fn update_encryption_info(
629        &self,
630        room_id: &RoomId,
631        session_id: SessionId<'_>,
632    ) -> Result<(), EventCacheError> {
633        trace!("Updating encryption info");
634
635        let Ok(client) = self.inner.client() else {
636            return Ok(());
637        };
638
639        let Some(room) = client.get_room(room_id) else {
640            return Ok(());
641        };
642
643        // Get all the relevant events.
644        let events = self.get_decrypted_events(room_id, session_id).await?;
645
646        if events.is_empty() {
647            trace!("No relevant events found.");
648            return Ok(());
649        }
650
651        // Let's attempt to update their encryption info.
652        self.update_encryption_info_for_events(&room, events).await
653    }
654
655    async fn retry_update_encryption_info_for_in_memory_events(&self) {
656        let decrypted_events = self.get_decrypted_events_from_memory().await;
657
658        for (room_id, events) in decrypted_events.into_iter() {
659            let Some(room) = self.inner.client().ok().and_then(|c| c.get_room(&room_id)) else {
660                continue;
661            };
662
663            if let Err(e) = self.update_encryption_info_for_events(&room, events).await {
664                warn!(
665                    %room_id,
666                    "Failed to replace the encryption info for in-memory events {e:?}"
667                );
668            }
669        }
670    }
671
672    /// Retry to decrypt and update the encryption info of all the events
673    /// contained in the memory part of the event cache.
674    ///
675    /// This list of events will map one-to-one to the events components
676    /// subscribed to the event cache are have received and are keeping cached.
677    ///
678    /// If components subscribed to the event cache are doing additional
679    /// caching, they'll need to listen to [RedecryptorReport]s and
680    /// explicitly request redecryption attempts using
681    /// [EventCache::request_decryption].
682    async fn retry_in_memory_events(&self) {
683        self.retry_decryption_for_in_memory_events().await;
684        self.retry_update_encryption_info_for_in_memory_events().await;
685    }
686
687    /// Explicitly request the redecryption of a set of events.
688    ///
689    /// The redecryption logic in the event cache might sometimes miss that a
690    /// room key has become available and that a certain set of events has
691    /// become decryptable.
692    ///
693    /// This might happen because some room keys might arrive in a separate
694    /// process handling push notifications or if a room key arrives but the
695    /// process shuts down before we could have decrypted the events.
696    ///
697    /// For this reason it is useful to tell the event cache explicitly that
698    /// some events should be retried to be redecrypted.
699    ///
700    /// This method allows you to do so. The events that get decrypted, if any,
701    /// will be advertised over the usual event cache subscription mechanism
702    /// which can be accessed using the [`RoomEventCache::subscribe()`]
703    /// method.
704    ///
705    /// # Examples
706    ///
707    /// ```no_run
708    /// # use matrix_sdk::{Client, event_cache::DecryptionRetryRequest};
709    /// # use url::Url;
710    /// # use ruma::owned_room_id;
711    /// # use std::collections::BTreeSet;
712    /// # async {
713    /// # let homeserver = Url::parse("http://localhost:8080")?;
714    /// # let client = Client::new(homeserver).await?;
715    /// let event_cache = client.event_cache();
716    /// let room_id = owned_room_id!("!my_room:localhost");
717    ///
718    /// let request = DecryptionRetryRequest {
719    ///     room_id,
720    ///     utd_session_ids: BTreeSet::from(["session_id".into()]),
721    ///     refresh_info_session_ids: BTreeSet::new(),
722    /// };
723    ///
724    /// event_cache.request_decryption(request);
725    /// # anyhow::Ok(()) };
726    /// ```
727    pub fn request_decryption(&self, request: DecryptionRetryRequest) {
728        let _ =
729            self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err(
730                |_| warn!("Requesting a decryption while the redecryption task has been shut down"),
731            );
732    }
733
734    /// Subscribe to reports that the redecryptor generates.
735    ///
736    /// The redecryption logic in the event cache might sometimes miss that a
737    /// room key has become available and that a certain set of events has
738    /// become decryptable.
739    ///
740    /// This might happen because some room keys might arrive in a separate
741    /// process handling push notifications or if room keys arrive faster than
742    /// we can handle them.
743    ///
744    /// This stream can be used to get notified about such situations as well as
745    /// a general channel where the event cache reports which events got
746    /// successfully redecrypted.
747    ///
748    /// # Examples
749    ///
750    /// ```no_run
751    /// # use matrix_sdk::{Client, event_cache::RedecryptorReport};
752    /// # use url::Url;
753    /// # use tokio_stream::StreamExt;
754    /// # async {
755    /// # let homeserver = Url::parse("http://localhost:8080")?;
756    /// # let client = Client::new(homeserver).await?;
757    /// let event_cache = client.event_cache();
758    ///
759    /// let mut stream = event_cache.subscribe_to_decryption_reports();
760    ///
761    /// while let Some(Ok(report)) = stream.next().await {
762    ///     match report {
763    ///         RedecryptorReport::Lagging => {
764    ///             // The event cache might have missed to redecrypt some events. We should tell
765    ///             // it which events we care about, i.e. which events we're displaying to the
766    ///             // user, and let it redecrypt things with an explicit request.
767    ///         }
768    ///         RedecryptorReport::BackupAvailable => {
769    ///             // A backup has become available. We can, just like in the Lagging case, tell
770    ///             // the event cache to attempt to redecrypt some events.
771    ///             //
772    ///             // This is only necessary with the BackupDownloadStrategy::OnDecryptionFailure
773    ///             // as the decryption attempt in this case will trigger the download of the
774    ///             // room key from the backup.
775    ///         }
776    ///         RedecryptorReport::ResolvedUtds { .. } => {
777    ///             // This may be interesting for statistical reasons or in case we'd like to
778    ///             // fetch and inspect these events in some manner.
779    ///         }
780    ///     }
781    /// }
782    /// # anyhow::Ok(()) };
783    /// ```
784    pub fn subscribe_to_decryption_reports(
785        &self,
786    ) -> impl Stream<Item = Result<RedecryptorReport, BroadcastStreamRecvError>> {
787        BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe())
788    }
789}
790
791#[inline(always)]
792fn upgrade_event_cache(cache: &Weak<EventCacheInner>) -> Option<EventCache> {
793    cache.upgrade().map(|inner| EventCache { inner })
794}
795
796async fn send_report_and_retry_memory_events(
797    cache: &Weak<EventCacheInner>,
798    report: RedecryptorReport,
799) -> Result<(), ()> {
800    let Some(cache) = upgrade_event_cache(cache) else {
801        return Err(());
802    };
803
804    cache.retry_in_memory_events().await;
805    let _ = cache.inner.redecryption_channels.utd_reporter.send(report);
806
807    Ok(())
808}
809
810/// Struct holding on to the redecryption task.
811///
812/// This struct implements the bulk of the redecryption task. It listens to the
813/// various streams that should trigger redecryption attempts.
814///
815/// For more info see the [module level docs](self).
816pub(crate) struct Redecryptor {
817    _task: BackgroundTaskHandle,
818}
819
820impl Redecryptor {
821    /// Create a new [`Redecryptor`].
822    ///
823    /// This creates a task that listens to various streams and attempts to
824    /// redecrypt UTDs that can be found inside the [`EventCache`].
825    pub(super) fn new(
826        client: &Client,
827        cache: Weak<EventCacheInner>,
828        receiver: UnboundedReceiver<DecryptionRetryRequest>,
829        linked_chunk_update_sender: &Sender<RoomEventCacheLinkedChunkUpdate>,
830    ) -> Self {
831        let linked_chunk_stream = BroadcastStream::new(linked_chunk_update_sender.subscribe());
832        let backup_state_stream = client.encryption().backups().state_stream();
833
834        let task = client
835            .task_monitor()
836            .spawn_infinite_task("event_cache::redecryptor", async {
837                let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
838
839                Self::listen_for_room_keys_task(
840                    cache,
841                    request_redecryption_stream,
842                    linked_chunk_stream,
843                    backup_state_stream,
844                )
845                .await;
846            })
847            .abort_on_drop();
848
849        Self { _task: task }
850    }
851
852    /// (Re)-subscribe to the room key stream from the [`OlmMachine`].
853    ///
854    /// This needs to happen any time this stream returns a `None` meaning that
855    /// the sending part of the stream has been dropped.
856    async fn subscribe_to_room_key_stream(
857        cache: &Weak<EventCacheInner>,
858    ) -> Option<(
859        impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>,
860        impl Stream<Item = Vec<RoomKeyWithheldInfo>>,
861    )> {
862        let event_cache = cache.upgrade()?;
863        let client = event_cache.client().ok()?;
864        let machine = client.olm_machine().await;
865
866        machine.as_ref().map(|m| {
867            (m.store().room_keys_received_stream(), m.store().room_keys_withheld_received_stream())
868        })
869    }
870
871    async fn redecryption_loop(
872        cache: &Weak<EventCacheInner>,
873        decryption_request_stream: &mut Pin<&mut impl Stream<Item = DecryptionRetryRequest>>,
874        events_stream: &mut Pin<
875            &mut impl Stream<Item = Result<RoomEventCacheLinkedChunkUpdate, BroadcastStreamRecvError>>,
876        >,
877        backup_state_stream: &mut Pin<
878            &mut impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
879        >,
880    ) -> bool {
881        let Some((room_key_stream, withheld_stream)) =
882            Self::subscribe_to_room_key_stream(cache).await
883        else {
884            return false;
885        };
886
887        pin_mut!(room_key_stream);
888        pin_mut!(withheld_stream);
889
890        loop {
891            tokio::select! {
892                // An explicit request, presumably from the timeline, has been received to decrypt
893                // events that were encrypted with a certain room key.
894                Some(request) = decryption_request_stream.next() => {
895                        let Some(cache) = upgrade_event_cache(cache) else {
896                            break false;
897                        };
898
899                        trace!(?request, "Received a redecryption request");
900
901                        for session_id in request.utd_session_ids {
902                            let _ = cache
903                                .retry_decryption(&request.room_id, &session_id)
904                                .await
905                                .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}"));
906                        }
907
908                        for session_id in request.refresh_info_session_ids {
909                            let _ = cache.update_encryption_info(&request.room_id, &session_id).await.inspect_err(|e|
910                                warn!(
911                                    room_id = %request.room_id,
912                                    session_id = session_id,
913                                    "Unable to update the encryption info {e:?}",
914                            ));
915                        }
916                }
917                // The room key stream from the OlmMachine. Needs to be recreated every time we
918                // receive a `None` from the stream.
919                room_keys = room_key_stream.next() => {
920                    match room_keys {
921                        Some(Ok(room_keys)) => {
922                            // Alright, some room keys were received and persisted in our store,
923                            // let's attempt to redecrypt events that were encrypted using these
924                            // room keys.
925                            let Some(cache) = upgrade_event_cache(cache) else {
926                                break false;
927                            };
928
929                            trace!(?room_keys, "Received new room keys");
930
931                            for key in &room_keys {
932                                let _ = cache
933                                    .retry_decryption(&key.room_id, &key.session_id)
934                                    .await
935                                    .inspect_err(|e| warn!("Error redecrypting {e:?}"));
936                            }
937
938                            for key in room_keys {
939                                let _ = cache.update_encryption_info(&key.room_id, &key.session_id).await.inspect_err(|e|
940                                    warn!(
941                                        room_id = %key.room_id,
942                                        session_id = key.session_id,
943                                        "Unable to update the encryption info {e:?}",
944                                ));
945                            }
946                        },
947                        Some(Err(_)) => {
948                            // We missed some room keys, we need to report this in case a listener
949                            // has and idea which UTDs we should attempt to redecrypt.
950                            //
951                            // This would most likely be the timeline from the UI crate. The
952                            // timeline might attempt to redecrypt all UTDs it is showing to the
953                            // user.
954                            warn!("The room key stream lagged, reporting the lag to our listeners");
955
956                            if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
957                                break false;
958                            }
959                        },
960                        // The stream got closed, this could mean that our OlmMachine got
961                        // regenerated, let's return true and try to recreate the stream.
962                        None => {
963                            break true;
964                        }
965                    }
966                }
967                withheld_info = withheld_stream.next() => {
968                    match withheld_info {
969                        Some(infos) => {
970                            let Some(cache) = upgrade_event_cache(cache) else {
971                                break false;
972                            };
973
974                            trace!(?infos, "Received new withheld infos");
975
976                            for RoomKeyWithheldInfo { room_id, session_id, .. } in &infos {
977                                let _ = cache.update_encryption_info(room_id, session_id).await.inspect_err(|e|
978                                    warn!(
979                                        room_id = %room_id,
980                                        session_id = session_id,
981                                        "Unable to update the encryption info {e:?}",
982                                ));
983                            }
984                        }
985                        // The stream got closed, same as for the room key stream, we'll try to
986                        // recreate the streams.
987                        None => break true,
988                    }
989                }
990                // Events that the event cache handled. If the event cache received any UTDs, let's
991                // attempt to redecrypt them in case the room key was received before the event
992                // cache was able to return them using `get_utds()`.
993                Some(event_updates) = events_stream.next() => {
994                    match event_updates {
995                        Ok(updates) => {
996                            let Some(cache) = upgrade_event_cache(cache) else {
997                                break false;
998                            };
999
1000                            let linked_chunk_id = updates.linked_chunk_id.to_owned();
1001
1002                            let _ = cache.retry_decryption_for_event_cache_updates(updates).await.inspect_err(|e|
1003                                warn!(
1004                                    %linked_chunk_id,
1005                                    "Unable to handle UTDs from event cache updates {e:?}",
1006                                )
1007                            );
1008                        }
1009                        Err(_) => {
1010                            if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1011                                break false;
1012                            }
1013                        }
1014                    }
1015                }
1016                Some(backup_state_update) = backup_state_stream.next() => {
1017                    match backup_state_update {
1018                        Ok(state) => {
1019                            match state {
1020                                BackupState::Unknown |
1021                                BackupState::Creating |
1022                                BackupState::Enabling |
1023                                BackupState::Resuming |
1024                                BackupState::Downloading |
1025                                BackupState::Disabling =>{
1026                                    // Those states aren't particularly interesting to components
1027                                    // listening to R2D2 reports.
1028                                }
1029                                BackupState::Enabled => {
1030                                    // Alright, the backup got enabled, we might or might not have
1031                                    // downloaded the room keys from the backup. In case they get
1032                                    // downloaded on-demand, let's try to decrypt all the events we
1033                                    // have cached in-memory.
1034                                    if send_report_and_retry_memory_events(cache, RedecryptorReport::BackupAvailable).await.is_err() {
1035                                        break false;
1036                                    }
1037                                }
1038                            }
1039                        }
1040                        Err(_) => {
1041                            if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1042                                break false;
1043                            }
1044                        }
1045                    }
1046                }
1047                else => break false,
1048            }
1049        }
1050    }
1051
1052    async fn listen_for_room_keys_task(
1053        cache: Weak<EventCacheInner>,
1054        decryption_request_stream: UnboundedReceiverStream<DecryptionRetryRequest>,
1055        events_stream: BroadcastStream<RoomEventCacheLinkedChunkUpdate>,
1056        backup_state_stream: impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
1057    ) {
1058        // We pin the decryption request stream here since that one doesn't need to be
1059        // recreated and we don't want to miss messages coming from the stream
1060        // while recreating it unnecessarily.
1061        pin_mut!(decryption_request_stream);
1062        pin_mut!(events_stream);
1063        pin_mut!(backup_state_stream);
1064
1065        while Self::redecryption_loop(
1066            &cache,
1067            &mut decryption_request_stream,
1068            &mut events_stream,
1069            &mut backup_state_stream,
1070        )
1071        .await
1072        {
1073            info!("Regenerating the re-decryption streams");
1074
1075            // Report that the stream got recreated so listeners know about it, at the same
1076            // time retry to decrypt anything we have cached in memory.
1077            if send_report_and_retry_memory_events(&cache, RedecryptorReport::Lagging)
1078                .await
1079                .is_err()
1080            {
1081                break;
1082            }
1083        }
1084
1085        info!("Shutting down the event cache redecryptor");
1086    }
1087}
1088
1089#[cfg(not(target_family = "wasm"))]
1090#[cfg(test)]
1091mod tests {
1092    use std::{
1093        collections::BTreeSet,
1094        sync::{
1095            Arc,
1096            atomic::{AtomicBool, Ordering},
1097        },
1098        time::Duration,
1099    };
1100
1101    use assert_matches2::assert_matches;
1102    use async_trait::async_trait;
1103    use eyeball_im::VectorDiff;
1104    use matrix_sdk_base::{
1105        cross_process_lock::CrossProcessLockGeneration,
1106        crypto::types::events::{ToDeviceEvent, room::encrypted::ToDeviceEncryptedEventContent},
1107        deserialized_responses::{TimelineEventKind, VerificationState},
1108        event_cache::{
1109            Event, Gap,
1110            store::{EventCacheStore, EventCacheStoreError, MemoryStore},
1111        },
1112        linked_chunk::{
1113            ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
1114            RawChunk, Update,
1115        },
1116        locks::Mutex,
1117        sleep::sleep,
1118        store::StoreConfig,
1119        timeout::timeout,
1120    };
1121    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
1122    use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
1123    use ruma::{
1124        EventId, OwnedEventId, RoomId, RoomVersionId, device_id, event_id,
1125        events::{AnySyncTimelineEvent, relation::RelationType},
1126        room_id,
1127        serde::Raw,
1128        user_id,
1129    };
1130    use serde_json::json;
1131    use tokio::sync::oneshot::{self, Sender};
1132    use tracing::{Instrument, info};
1133
1134    use crate::{
1135        Client, assert_let_timeout,
1136        encryption::EncryptionSettings,
1137        event_cache::{
1138            DecryptionRetryRequest, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
1139            TimelineVectorDiffs,
1140        },
1141        test_utils::mocks::MatrixMockServer,
1142    };
1143
1144    /// A wrapper for the memory store for the event cache.
1145    ///
1146    /// Delays the persisting of events, or linked chunk updates, to allow the
1147    /// testing of race conditions between the event cache and R2D2.
1148    #[derive(Debug, Clone)]
1149    struct DelayingStore {
1150        memory_store: MemoryStore,
1151        delaying: Arc<AtomicBool>,
1152        foo: Arc<Mutex<Option<Sender<()>>>>,
1153    }
1154
1155    impl DelayingStore {
1156        fn new() -> Self {
1157            Self {
1158                memory_store: MemoryStore::new(),
1159                delaying: AtomicBool::new(true).into(),
1160                foo: Arc::new(Mutex::new(None)),
1161            }
1162        }
1163
1164        async fn stop_delaying(&self) {
1165            let (sender, receiver) = oneshot::channel();
1166
1167            {
1168                *self.foo.lock() = Some(sender);
1169            }
1170
1171            self.delaying.store(false, Ordering::SeqCst);
1172
1173            receiver.await.expect("We should be able to receive a response")
1174        }
1175    }
1176
1177    #[cfg_attr(target_family = "wasm", async_trait(?Send))]
1178    #[cfg_attr(not(target_family = "wasm"), async_trait)]
1179    impl EventCacheStore for DelayingStore {
1180        type Error = EventCacheStoreError;
1181
1182        async fn try_take_leased_lock(
1183            &self,
1184            lease_duration_ms: u32,
1185            key: &str,
1186            holder: &str,
1187        ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
1188            self.memory_store.try_take_leased_lock(lease_duration_ms, key, holder).await
1189        }
1190
1191        async fn handle_linked_chunk_updates(
1192            &self,
1193            linked_chunk_id: LinkedChunkId<'_>,
1194            updates: Vec<Update<Event, Gap>>,
1195        ) -> Result<(), Self::Error> {
1196            // This is the key behaviour of this store - we wait to set this value until
1197            // someone calls `stop_delaying`.
1198            //
1199            // We use `sleep` here for simplicity. The cool way would be to use a custom
1200            // waker or something like that.
1201            while self.delaying.load(Ordering::SeqCst) {
1202                sleep(Duration::from_millis(10)).await;
1203            }
1204
1205            let sender = self.foo.lock().take();
1206            let ret = self.memory_store.handle_linked_chunk_updates(linked_chunk_id, updates).await;
1207
1208            if let Some(sender) = sender {
1209                sender.send(()).expect("We should be able to notify the other side that we're done with the storage operation");
1210            }
1211
1212            ret
1213        }
1214
1215        async fn load_all_chunks(
1216            &self,
1217            linked_chunk_id: LinkedChunkId<'_>,
1218        ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
1219            self.memory_store.load_all_chunks(linked_chunk_id).await
1220        }
1221
1222        async fn load_all_chunks_metadata(
1223            &self,
1224            linked_chunk_id: LinkedChunkId<'_>,
1225        ) -> Result<Vec<ChunkMetadata>, Self::Error> {
1226            self.memory_store.load_all_chunks_metadata(linked_chunk_id).await
1227        }
1228
1229        async fn load_last_chunk(
1230            &self,
1231            linked_chunk_id: LinkedChunkId<'_>,
1232        ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1233            self.memory_store.load_last_chunk(linked_chunk_id).await
1234        }
1235
1236        async fn load_previous_chunk(
1237            &self,
1238            linked_chunk_id: LinkedChunkId<'_>,
1239            before_chunk_identifier: ChunkIdentifier,
1240        ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1241            self.memory_store.load_previous_chunk(linked_chunk_id, before_chunk_identifier).await
1242        }
1243
1244        async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1245            self.memory_store.clear_all_linked_chunks().await
1246        }
1247
1248        async fn filter_duplicated_events(
1249            &self,
1250            linked_chunk_id: LinkedChunkId<'_>,
1251            events: Vec<OwnedEventId>,
1252        ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1253            self.memory_store.filter_duplicated_events(linked_chunk_id, events).await
1254        }
1255
1256        async fn find_event(
1257            &self,
1258            room_id: &RoomId,
1259            event_id: &EventId,
1260        ) -> Result<Option<Event>, Self::Error> {
1261            self.memory_store.find_event(room_id, event_id).await
1262        }
1263
1264        async fn find_event_relations(
1265            &self,
1266            room_id: &RoomId,
1267            event_id: &EventId,
1268            filters: Option<&[RelationType]>,
1269        ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1270            self.memory_store.find_event_relations(room_id, event_id, filters).await
1271        }
1272
1273        async fn get_room_events(
1274            &self,
1275            room_id: &RoomId,
1276            event_type: Option<&str>,
1277            session_id: Option<&str>,
1278        ) -> Result<Vec<Event>, Self::Error> {
1279            self.memory_store.get_room_events(room_id, event_type, session_id).await
1280        }
1281
1282        async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1283            self.memory_store.save_event(room_id, event).await
1284        }
1285
1286        async fn optimize(&self) -> Result<(), Self::Error> {
1287            self.memory_store.optimize().await
1288        }
1289
1290        async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1291            self.memory_store.get_size().await
1292        }
1293    }
1294
1295    async fn set_up_clients(
1296        room_id: &RoomId,
1297        alice_enables_cross_signing: bool,
1298        use_delayed_store: bool,
1299    ) -> (Client, Client, MatrixMockServer, Option<DelayingStore>) {
1300        let alice_span = tracing::info_span!("alice");
1301        let bob_span = tracing::info_span!("bob");
1302
1303        let alice_user_id = user_id!("@alice:localhost");
1304        let alice_device_id = device_id!("ALICEDEVICE");
1305        let bob_user_id = user_id!("@bob:localhost");
1306        let bob_device_id = device_id!("BOBDEVICE");
1307
1308        let matrix_mock_server = MatrixMockServer::new().await;
1309        matrix_mock_server.mock_crypto_endpoints_preset().await;
1310
1311        let encryption_settings = EncryptionSettings {
1312            auto_enable_cross_signing: alice_enables_cross_signing,
1313            ..Default::default()
1314        };
1315
1316        // Create some clients for Alice and Bob.
1317
1318        let alice = matrix_mock_server
1319            .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id)
1320            .on_builder(|builder| {
1321                builder
1322                    .with_enable_share_history_on_invite(true)
1323                    .with_encryption_settings(encryption_settings)
1324            })
1325            .build()
1326            .instrument(alice_span.clone())
1327            .await;
1328
1329        let encryption_settings =
1330            EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() };
1331
1332        let (store_config, store) = if use_delayed_store {
1333            let store = DelayingStore::new();
1334
1335            (
1336                StoreConfig::new(CrossProcessLockConfig::multi_process(
1337                    "delayed_store_event_cache_test",
1338                ))
1339                .event_cache_store(store.clone()),
1340                Some(store),
1341            )
1342        } else {
1343            (
1344                StoreConfig::new(CrossProcessLockConfig::multi_process(
1345                    "normal_store_event_cache_test",
1346                )),
1347                None,
1348            )
1349        };
1350
1351        let bob = matrix_mock_server
1352            .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id)
1353            .on_builder(|builder| {
1354                builder
1355                    .with_enable_share_history_on_invite(true)
1356                    .with_encryption_settings(encryption_settings)
1357                    .store_config(store_config)
1358            })
1359            .build()
1360            .instrument(bob_span.clone())
1361            .await;
1362
1363        bob.event_cache().subscribe().expect("Bob should be able to enable the event cache");
1364
1365        // Ensure that Alice and Bob are aware of their devices and identities.
1366        matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await;
1367
1368        let event_factory = EventFactory::new().room(room_id).sender(alice_user_id);
1369
1370        // Let us now create a room for them.
1371        let room_builder = JoinedRoomBuilder::new(room_id)
1372            .add_state_event(event_factory.create(alice_user_id, RoomVersionId::V1))
1373            .add_state_event(event_factory.room_encryption());
1374
1375        matrix_mock_server
1376            .mock_sync()
1377            .ok_and_run(&alice, |builder| {
1378                builder.add_joined_room(room_builder.clone());
1379            })
1380            .instrument(alice_span)
1381            .await;
1382
1383        matrix_mock_server
1384            .mock_sync()
1385            .ok_and_run(&bob, |builder| {
1386                builder.add_joined_room(room_builder);
1387            })
1388            .instrument(bob_span)
1389            .await;
1390
1391        (alice, bob, matrix_mock_server, store)
1392    }
1393
1394    async fn prepare_room(
1395        matrix_mock_server: &MatrixMockServer,
1396        event_factory: &EventFactory,
1397        alice: &Client,
1398        bob: &Client,
1399        room_id: &RoomId,
1400    ) -> (Raw<AnySyncTimelineEvent>, Raw<ToDeviceEvent<ToDeviceEncryptedEventContent>>) {
1401        let alice_user_id = alice.user_id().unwrap();
1402        let bob_user_id = bob.user_id().unwrap();
1403
1404        let alice_member_event = event_factory.member(alice_user_id).into_raw();
1405        let bob_member_event = event_factory.member(bob_user_id).into_raw();
1406
1407        let room = alice
1408            .get_room(room_id)
1409            .expect("Alice should have access to the room now that we synced");
1410
1411        // Alice will send a single event to the room, but this will trigger a to-device
1412        // message containing the room key to be sent as well. We capture both the event
1413        // and the to-device message.
1414
1415        let event_type = "m.room.message";
1416        let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"});
1417
1418        let event_id = event_id!("$some_id");
1419        let (event_receiver, mock) =
1420            matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id);
1421        let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await;
1422
1423        {
1424            let _guard = mock.mock_once().mount_as_scoped().await;
1425
1426            matrix_mock_server
1427                .mock_get_members()
1428                .ok(vec![alice_member_event.clone(), bob_member_event.clone()])
1429                .mock_once()
1430                .mount()
1431                .await;
1432
1433            room.send_raw(event_type, content)
1434                .await
1435                .expect("We should be able to send an initial message");
1436        };
1437
1438        // Let us retrieve the captured event and to-device message.
1439        let event = event_receiver.await.expect("Alice should have sent the event by now");
1440        let room_key = room_key.await;
1441
1442        (event, room_key)
1443    }
1444
1445    #[async_test]
1446    async fn test_redecryptor() {
1447        let room_id = room_id!("!test:localhost");
1448
1449        let event_factory = EventFactory::new().room(room_id);
1450        let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, true, false).await;
1451
1452        let (event, room_key) =
1453            prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1454
1455        // Let's now see what Bob's event cache does.
1456
1457        let event_cache = bob.event_cache();
1458        let (room_cache, _) = event_cache
1459            .for_room(room_id)
1460            .await
1461            .expect("We should be able to get to the event cache for a specific room");
1462
1463        let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1464        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1465
1466        // We regenerate the Olm machine to check if the room key stream is recreated to
1467        // correctly.
1468        bob.inner
1469            .base_client
1470            .regenerate_olm(None)
1471            .await
1472            .expect("We should be able to regenerate the Olm machine");
1473
1474        // Let us forward the event to Bob.
1475        matrix_mock_server
1476            .mock_sync()
1477            .ok_and_run(&bob, |builder| {
1478                builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1479            })
1480            .await;
1481
1482        // Alright, Bob has received an update from the cache.
1483
1484        assert_let_timeout!(
1485            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1486                subscriber.recv()
1487        );
1488
1489        // There should be a single new event, and it should be a UTD as we did not
1490        // receive the room key yet.
1491        assert_eq!(diffs.len(), 1);
1492        assert_matches!(&diffs[0], VectorDiff::Append { values });
1493        assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1494
1495        assert_let_timeout!(
1496            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1497        );
1498        assert_eq!(expected_room_id, room_id);
1499        assert!(generic_stream.is_empty());
1500
1501        // Now we send the room key to Bob.
1502        matrix_mock_server
1503            .mock_sync()
1504            .ok_and_run(&bob, |builder| {
1505                builder.add_to_device_event(
1506                    room_key
1507                        .deserialize_as()
1508                        .expect("We should be able to deserialize the room key"),
1509                );
1510            })
1511            .await;
1512
1513        // Bob should receive a new update from the cache.
1514        assert_let_timeout!(
1515            Duration::from_secs(1),
1516            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1517                subscriber.recv()
1518        );
1519
1520        // It should replace the UTD with a decrypted event.
1521        assert_eq!(diffs.len(), 1);
1522        assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1523        assert_eq!(*index, 0);
1524        assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1525
1526        assert_let_timeout!(
1527            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1528        );
1529        assert_eq!(expected_room_id, room_id);
1530        assert!(generic_stream.is_empty());
1531    }
1532
1533    #[async_test]
1534    async fn test_redecryptor_updating_encryption_info() {
1535        let bob_span = tracing::info_span!("bob");
1536
1537        let room_id = room_id!("!test:localhost");
1538
1539        let event_factory = EventFactory::new().room(room_id);
1540        let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, false, false).await;
1541
1542        let (event, room_key) =
1543            prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1544
1545        // Let's now see what Bob's event cache does.
1546
1547        let event_cache = bob.event_cache();
1548        let (room_cache, _) = event_cache
1549            .for_room(room_id)
1550            .instrument(bob_span.clone())
1551            .await
1552            .expect("We should be able to get to the event cache for a specific room");
1553
1554        let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1555        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1556
1557        // Let us forward the event to Bob.
1558        matrix_mock_server
1559            .mock_sync()
1560            .ok_and_run(&bob, |builder| {
1561                builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1562            })
1563            .instrument(bob_span.clone())
1564            .await;
1565
1566        // Alright, Bob has received an update from the cache.
1567
1568        assert_let_timeout!(
1569            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1570                subscriber.recv()
1571        );
1572
1573        // There should be a single new event, and it should be a UTD as we did not
1574        // receive the room key yet.
1575        assert_eq!(diffs.len(), 1);
1576        assert_matches!(&diffs[0], VectorDiff::Append { values });
1577        assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1578
1579        assert_let_timeout!(
1580            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1581        );
1582        assert_eq!(expected_room_id, room_id);
1583        assert!(generic_stream.is_empty());
1584
1585        // Now we send the room key to Bob.
1586        matrix_mock_server
1587            .mock_sync()
1588            .ok_and_run(&bob, |builder| {
1589                builder.add_to_device_event(
1590                    room_key
1591                        .deserialize_as()
1592                        .expect("We should be able to deserialize the room key"),
1593                );
1594            })
1595            .instrument(bob_span.clone())
1596            .await;
1597
1598        // Bob should receive a new update from the cache.
1599        assert_let_timeout!(
1600            Duration::from_secs(1),
1601            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1602                subscriber.recv()
1603        );
1604
1605        // It should replace the UTD with a decrypted event.
1606        assert_eq!(diffs.len(), 1);
1607        assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1608        assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1609
1610        let encryption_info = value.encryption_info().unwrap();
1611        assert_matches!(&encryption_info.verification_state, VerificationState::Unverified(_));
1612
1613        assert_let_timeout!(
1614            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1615        );
1616        assert_eq!(expected_room_id, room_id);
1617        assert!(generic_stream.is_empty());
1618
1619        let session_id = encryption_info.session_id().unwrap().to_owned();
1620        let alice_user_id = alice.user_id().unwrap();
1621
1622        // Alice now creates the identity.
1623        alice
1624            .encryption()
1625            .bootstrap_cross_signing(None)
1626            .await
1627            .expect("Alice should be able to create the cross-signing keys");
1628
1629        bob.update_tracked_users_for_testing([alice_user_id]).instrument(bob_span.clone()).await;
1630        matrix_mock_server
1631            .mock_sync()
1632            .ok_and_run(&bob, |builder| {
1633                builder.add_change_device(alice_user_id);
1634            })
1635            .instrument(bob_span.clone())
1636            .await;
1637
1638        bob.event_cache().request_decryption(DecryptionRetryRequest {
1639            room_id: room_id.into(),
1640            utd_session_ids: BTreeSet::new(),
1641            refresh_info_session_ids: BTreeSet::from([session_id]),
1642        });
1643
1644        // Bob should again receive a new update from the cache, this time updating the
1645        // encryption info.
1646        assert_let_timeout!(
1647            Duration::from_secs(1),
1648            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1649                subscriber.recv()
1650        );
1651
1652        assert_eq!(diffs.len(), 1);
1653        assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1654        assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1655        let encryption_info = value.encryption_info().unwrap();
1656
1657        assert_matches!(
1658            &encryption_info.verification_state,
1659            VerificationState::Unverified(_),
1660            "The event should now know about the identity but still be unverified"
1661        );
1662
1663        assert_let_timeout!(
1664            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1665        );
1666        assert_eq!(expected_room_id, room_id);
1667        assert!(generic_stream.is_empty());
1668    }
1669
1670    #[async_test]
1671    async fn test_event_is_redecrypted_even_if_key_arrives_while_event_processing() {
1672        let room_id = room_id!("!test:localhost");
1673
1674        let event_factory = EventFactory::new().room(room_id);
1675        let (alice, bob, matrix_mock_server, delayed_store) =
1676            set_up_clients(room_id, true, true).await;
1677
1678        let delayed_store = delayed_store.unwrap();
1679
1680        let (event, room_key) =
1681            prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1682
1683        let event_cache = bob.event_cache();
1684
1685        // Let's now see what Bob's event cache does.
1686        let (room_cache, _) = event_cache
1687            .for_room(room_id)
1688            .await
1689            .expect("We should be able to get to the event cache for a specific room");
1690
1691        let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1692        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1693
1694        // Let us forward the event to Bob.
1695        matrix_mock_server
1696            .mock_sync()
1697            .ok_and_run(&bob, |builder| {
1698                builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1699            })
1700            .await;
1701
1702        // Now we send the room key to Bob.
1703        matrix_mock_server
1704            .mock_sync()
1705            .ok_and_run(&bob, |builder| {
1706                builder.add_to_device_event(
1707                    room_key
1708                        .deserialize_as()
1709                        .expect("We should be able to deserialize the room key"),
1710                );
1711            })
1712            .await;
1713
1714        info!("Stopping the delay");
1715        delayed_store.stop_delaying().await;
1716
1717        // Now that the first decryption attempt has failed since the sync with the
1718        // event did not contain the room key, and the decryptor has received
1719        // the room key but the event was not persisted in the cache as of yet,
1720        // let's the event cache process the event.
1721
1722        // Alright, Bob has received an update from the cache.
1723        assert_let_timeout!(
1724            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1725                subscriber.recv()
1726        );
1727
1728        // There should be a single new event, and it should be a UTD as we did not
1729        // receive the room key yet.
1730        assert_eq!(diffs.len(), 1);
1731        assert_matches!(&diffs[0], VectorDiff::Append { values });
1732        assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1733
1734        assert_let_timeout!(
1735            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1736        );
1737        assert_eq!(expected_room_id, room_id);
1738
1739        // Bob should receive a new update from the cache.
1740        assert_let_timeout!(
1741            Duration::from_secs(1),
1742            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1743                subscriber.recv()
1744        );
1745
1746        // It should replace the UTD with a decrypted event.
1747        assert_eq!(diffs.len(), 1);
1748        assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1749        assert_eq!(*index, 0);
1750        assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1751
1752        assert_let_timeout!(
1753            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1754        );
1755        assert_eq!(expected_room_id, room_id);
1756        assert!(generic_stream.is_empty());
1757    }
1758
1759    /// Regression test: the redecryptor must NOT hold the room state write
1760    /// lock while calling replace_utds() on event-focused caches, otherwise
1761    /// an ABBA deadlock occurs with concurrent event-focused cache pagination.
1762    #[async_test]
1763    async fn test_redecryptor_no_deadlock_with_event_focused_cache_pagination() {
1764        use crate::{
1765            event_cache::EventFocusThreadMode,
1766            test_utils::mocks::{RoomContextResponseTemplate, RoomMessagesResponseTemplate},
1767        };
1768
1769        let room_id = room_id!("!test:localhost");
1770        let f = EventFactory::new().room(room_id);
1771        let (alice, bob, server, _) = set_up_clients(room_id, true, false).await;
1772
1773        let (encrypted_event, room_key) = prepare_room(&server, &f, &alice, &bob, room_id).await;
1774
1775        let event_cache = bob.event_cache();
1776        let (room_cache, _drop_handles) = event_cache
1777            .for_room(room_id)
1778            .await
1779            .expect("Bob should have an event cache for the room");
1780
1781        let (_initial_events, mut subscriber) = room_cache.subscribe().await.unwrap();
1782
1783        // Sync the encrypted event to Bob. Without the room key, it's a UTD.
1784        server
1785            .mock_sync()
1786            .ok_and_run(&bob, |builder| {
1787                builder.add_joined_room(
1788                    JoinedRoomBuilder::new(room_id).add_timeline_event(encrypted_event),
1789                );
1790            })
1791            .await;
1792
1793        // Consume the UTD update from the subscriber.
1794        assert_let_timeout!(
1795            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1796                subscriber.recv()
1797        );
1798        assert_eq!(diffs.len(), 1);
1799        assert_matches!(&diffs[0], VectorDiff::Append { values });
1800        assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1801
1802        // Create an event-focused cache with a backward pagination gap.
1803        let focused_event_id = event_id!("$focused");
1804        let bob_user_id = bob.user_id().unwrap();
1805
1806        server
1807            .mock_room_event_context()
1808            .expect_any_access_token()
1809            .ok(RoomContextResponseTemplate::new(
1810                f.text_msg("focused msg")
1811                    .sender(bob_user_id)
1812                    .event_id(focused_event_id)
1813                    .into_event(),
1814            )
1815            .start("back-token"))
1816            .mock_once()
1817            .mount()
1818            .await;
1819
1820        let event_focused_cache = room_cache
1821            .get_or_create_event_focused_cache(
1822                focused_event_id.to_owned(),
1823                20,
1824                EventFocusThreadMode::Automatic,
1825            )
1826            .await
1827            .unwrap();
1828
1829        // Mock /messages with a long delay, simulating a slow network.
1830        server
1831            .mock_room_messages()
1832            .expect_any_access_token()
1833            .ok(RoomMessagesResponseTemplate::default().with_delay(Duration::from_secs(5)))
1834            .mock_once()
1835            .mount()
1836            .await;
1837
1838        // Start backward pagination on the event-focused cache. This holds the cache
1839        // write lock across the slow /messages network request.
1840        let event_focused_cache_clone = event_focused_cache.clone();
1841        let pagination_task = tokio::spawn(async move {
1842            let _ = event_focused_cache_clone.paginate_backwards(20).await;
1843        });
1844
1845        // Let the pagination task acquire the event-focused cache write lock.
1846        sleep(Duration::from_millis(200)).await;
1847
1848        // Send the room key to Bob.
1849        //
1850        // The redecryptor background task will pick it up and decrypt the UTD.
1851        // Previously, on_resolved_utds would hold the room state write lock
1852        // while calling replace_utds on event-focused caches, creating an ABBA
1853        // deadlock with pagination (which holds event-focused lock and needs
1854        // room state lock via save_events).
1855        server
1856            .mock_sync()
1857            .ok_and_run(&bob, |builder| {
1858                builder.add_to_device_event(
1859                    room_key
1860                        .deserialize_as()
1861                        .expect("We should be able to deserialize the room key"),
1862                );
1863            })
1864            .await;
1865
1866        // Wait for the redecryptor to process the room key.
1867        sleep(Duration::from_secs(1)).await;
1868
1869        // Subscribing requires a room state read lock (which would be awaited forever,
1870        // before the fix).
1871        let (_events, _subscriber) = timeout(room_cache.subscribe(), Duration::from_millis(100))
1872            .await
1873            .expect("subscribing shouldn't timeout")
1874            .expect("subscribing should succeed");
1875
1876        pagination_task.abort();
1877    }
1878}