Skip to main content

matrix_sdk/event_cache/
redecryptor.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The Redecryptor (affectionately known as R2D2) is a layer and long-running
16//! background task which handles redecryption of events in case we couldn't
17//! decrypt them immediately.
18//!
19//! There are various reasons why a room key might not be available immediately
20//! when the event becomes available:
21//!     - The to-device message containing the room key just arrives late, i.e.
22//!       after the room event.
23//!     - The event is a historic event and we need to first download the room
24//!       key from the backup.
25//!     - The event is a historic event in a previously unjoined room, we need
26//!       to receive historic room keys as defined in [MSC3061].
27//!
28//! R2D2 listens to the [`OlmMachine`] for received room keys and new
29//! m.room_key.withheld events.
30//!
31//! If a new room key has been received, it attempts to find any UTDs in the
32//! [`EventCache`]. If R2D2 decrypts any UTDs from the event cache, it will
33//! replace the events in the cache and send out new [`RoomEventCacheUpdate`]s
34//! to any of its listeners.
35//!
36//! If a new withheld info has been received, it attempts to find any relevant
37//! events and updates the [`EncryptionInfo`] of an event.
38//!
39//! There's an additional gotcha: the [`OlmMachine`] might get recreated by
40//! calls to [`BaseClient::regenerate_olm()`]. When this happens, we will
41//! receive a `None` on the room keys stream and we need to re-listen to it.
42//!
43//! Another gotcha is that room keys might be received on another process if the
44//! [`Client`] is operating on a Apple iOS device. A separate process is used
45//! in this case to receive push notifications. In this case, the room key will
46//! be received and R2D2 won't get notified about it. To work around this,
47//! decryption requests can be explicitly sent to R2D2.
48//!
49//! The final gotcha is that a room key might be received just in between the
50//! time the event was initially tried to be decrypted and the time it took to
51//! persist it in the event cache. To handle this race condition, R2D2 listens
52//! to the event cache and attempts to decrypt any UTDs the event cache
53//! persists.
54//!
55//! In the graph below, the Timeline block is meant to be the `Timeline` from
56//! the `matrix-sdk-ui` crate, but it could be any other listener that
57//! subscribes to [`RedecryptorReport`] stream.
58//!
59//! ```markdown
60//! 
61//!      .----------------------.
62//!     |                        |
63//!     |      Beeb, boop!       |
64//!     |                        .
65//!      ----------------------._ \
66//!                               -;  _____
67//!                                 .`/L|__`.
68//!                                / =[_]O|` \
69//!                                |"+_____":|
70//!                              __:='|____`-:__
71//!                             ||[] ||====|| []||
72//!                             ||[] ||====|| []||
73//!                             |:== ||====|| ==:|
74//!                             ||[] ||====|| []||
75//!                             ||[] ||====|| []||
76//!                            _||_  ||====||  _||_
77//!                           (====) |:====:| (====)
78//!                            }--{  | |  | |  }--{
79//!                           (____) |_|  |_| (____)
80//!
81//!                              ┌─────────────┐
82//!                              │             │
83//!                  ┌───────────┤   Timeline  │◄────────────┐
84//!                  │           │             │             │
85//!                  │           └──────▲──────┘             │
86//!                  │                  │                    │
87//!                  │                  │                    │
88//!                  │                  │                    │
89//!              Decryption             │                Redecryptor
90//!                request              │                  report
91//!                  │        RoomEventCacheUpdates          │
92//!                  │                  │                    │
93//!                  │                  │                    │
94//!                  │      ┌───────────┴───────────┐        │
95//!                  │      │                       │        │
96//!                  └──────►         R2D2          │────────┘
97//!                         │                       │
98//!                         └──▲─────────────────▲──┘
99//!                            │                 │
100//!                            │                 │
101//!                            │                 │
102//!                         Received        Received room
103//!                          events          keys stream
104//!                            │                 │
105//!                            │                 │
106//!                            │                 │
107//!                    ┌───────┴──────┐  ┌───────┴──────┐
108//!                    │              │  │              │
109//!                    │  Event Cache │  │  OlmMachine  │
110//!                    │              │  │              │
111//!                    └──────────────┘  └──────────────┘
112//! ```
113//!
114//! [MSC3061]: https://github.com/matrix-org/matrix-spec/pull/1655#issuecomment-2213152255
115
116use std::{
117    collections::{BTreeMap, BTreeSet},
118    pin::Pin,
119    sync::Weak,
120};
121
122use as_variant::as_variant;
123use futures_core::Stream;
124use futures_util::{StreamExt, future::try_join_all, pin_mut};
125#[cfg(doc)]
126use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
127use matrix_sdk_base::{
128    crypto::{
129        store::types::{RoomKeyInfo, RoomKeyWithheldInfo},
130        types::events::room::encrypted::EncryptedEvent,
131    },
132    deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind},
133    locks::Mutex,
134    task_monitor::BackgroundTaskHandle,
135    timer,
136};
137#[cfg(doc)]
138use matrix_sdk_common::deserialized_responses::EncryptionInfo;
139use ruma::{
140    OwnedEventId, OwnedRoomId, RoomId,
141    events::{AnySyncTimelineEvent, room::encrypted::OriginalSyncRoomEncryptedEvent},
142    push::Action,
143    serde::Raw,
144};
145use tokio::sync::{
146    broadcast::{self, Sender},
147    mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
148};
149use tokio_stream::wrappers::{
150    BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError,
151};
152use tracing::{info, instrument, trace, warn};
153
154#[cfg(doc)]
155use super::RoomEventCache;
156use super::{
157    EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheGenericUpdate,
158    RoomEventCacheUpdate, TimelineVectorDiffs, caches::room::RoomEventCacheLinkedChunkUpdate,
159};
160use crate::{Client, Result, Room, encryption::backups::BackupState, room::PushContext};
161
162type SessionId<'a> = &'a str;
163type OwnedSessionId = String;
164
165type EventIdAndUtd = (OwnedEventId, Raw<AnySyncTimelineEvent>);
166type EventIdAndEvent = (OwnedEventId, DecryptedRoomEvent);
167pub(in crate::event_cache) type ResolvedUtd =
168    (OwnedEventId, DecryptedRoomEvent, Option<Vec<Action>>);
169
170/// The information sent across the channel to the long-running task requesting
171/// that the supplied set of sessions be retried.
172#[derive(Debug, Clone)]
173pub struct DecryptionRetryRequest {
174    /// The room ID of the room the events belong to.
175    pub room_id: OwnedRoomId,
176    /// Events that are not decrypted.
177    pub utd_session_ids: BTreeSet<OwnedSessionId>,
178    /// Events that are decrypted but might need to have their
179    /// [`EncryptionInfo`] refreshed.
180    pub refresh_info_session_ids: BTreeSet<OwnedSessionId>,
181}
182
183/// A report coming from the redecryptor.
184#[derive(Debug, Clone)]
185pub enum RedecryptorReport {
186    /// Events which we were able to decrypt.
187    ResolvedUtds {
188        /// The room ID of the room the events belong to.
189        room_id: OwnedRoomId,
190        /// The list of event IDs of the decrypted events.
191        events: BTreeSet<OwnedEventId>,
192    },
193    /// The redecryptor might have missed some room keys so it might not have
194    /// re-decrypted events that are now decryptable.
195    Lagging,
196    /// A room key backup has become available.
197    ///
198    /// This means that components might want to tell R2D2 about events they
199    /// care about to attempt a decryption.
200    BackupAvailable,
201}
202
203pub(super) struct RedecryptorChannels {
204    utd_reporter: Sender<RedecryptorReport>,
205    pub(super) decryption_request_sender: UnboundedSender<DecryptionRetryRequest>,
206    pub(super) decryption_request_receiver:
207        Mutex<Option<UnboundedReceiver<DecryptionRetryRequest>>>,
208}
209
210impl RedecryptorChannels {
211    pub(super) fn new() -> Self {
212        let (utd_reporter, _) = broadcast::channel(100);
213        let (decryption_request_sender, decryption_request_receiver) = unbounded_channel();
214
215        Self {
216            utd_reporter,
217            decryption_request_sender,
218            decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)),
219        }
220    }
221}
222
223/// A function which can be used to filter and map [`TimelineEvent`]s into a
224/// tuple of event ID and raw [`AnySyncTimelineEvent`].
225///
226/// The tuple can be used to attempt to redecrypt events.
227fn filter_timeline_event_to_utd(
228    event: TimelineEvent,
229) -> Option<(OwnedEventId, Raw<AnySyncTimelineEvent>)> {
230    let event_id = event.event_id().map(ToOwned::to_owned);
231
232    // Only pick out events that are UTDs, get just the Raw event as this is what
233    // the OlmMachine needs.
234    let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event);
235    // Zip the event ID and event together so we don't have to pick out the event ID
236    // again. We need the event ID to replace the event in the cache.
237    event_id.zip(event)
238}
239
240/// A function which can be used to filter an map [`TimelineEvent`]s into a
241/// tuple of event ID and [`DecryptedRoomEvent`].
242///
243/// The tuple can be used to attempt to update the encryption info of the
244/// decrypted event.
245fn filter_timeline_event_to_decrypted(
246    event: TimelineEvent,
247) -> Option<(OwnedEventId, DecryptedRoomEvent)> {
248    let event_id = event.event_id().map(ToOwned::to_owned);
249
250    let event = as_variant!(event.kind, TimelineEventKind::Decrypted(event) => event);
251    // Zip the event ID and event together so we don't have to pick out the event ID
252    // again. We need the event ID to replace the event in the cache.
253    event_id.zip(event)
254}
255
256impl EventCache {
257    /// Retrieve a set of events that we weren't able to decrypt.
258    ///
259    /// # Arguments
260    ///
261    /// * `room_id` - The ID of the room where the events were sent to.
262    /// * `session_id` - The unique ID of the room key that was used to encrypt
263    ///   the event.
264    async fn all_encrypted_events(
265        &self,
266        room_id: &RoomId,
267        session_id: SessionId<'_>,
268    ) -> Result<Vec<EventIdAndUtd>, EventCacheError> {
269        let caches = self.inner.all_caches_for_room(room_id).await?;
270
271        Ok(caches
272            .all_events_of_type(Some("m.room.encrypted"), Some(session_id))
273            .await?
274            .filter_map(filter_timeline_event_to_utd)
275            .collect())
276    }
277
278    /// Retrieve a set of events that we weren't able to decrypt from the memory
279    /// of the event cache.
280    async fn all_in_memory_encrypted_events(&self) -> BTreeMap<OwnedRoomId, Vec<EventIdAndUtd>> {
281        let mut utds = BTreeMap::new();
282
283        for (room_id, caches) in self.inner.by_room.read().await.iter() {
284            let room_utds: Vec<_> = caches
285                .all_in_memory_events()
286                .await
287                .into_iter()
288                .flatten()
289                .filter_map(filter_timeline_event_to_utd)
290                .collect();
291
292            utds.insert(room_id.to_owned(), room_utds);
293        }
294
295        utds
296    }
297
298    async fn all_decrypted_events(
299        &self,
300        room_id: &RoomId,
301        session_id: SessionId<'_>,
302    ) -> Result<Vec<EventIdAndEvent>, EventCacheError> {
303        let caches = self.inner.all_caches_for_room(room_id).await?;
304
305        Ok(caches
306            .all_events_of_type(None, Some(session_id))
307            .await?
308            .filter_map(filter_timeline_event_to_decrypted)
309            .collect())
310    }
311
312    async fn all_in_memory_decrypted_events(&self) -> BTreeMap<OwnedRoomId, Vec<EventIdAndEvent>> {
313        let mut decrypted_events = BTreeMap::new();
314
315        for (room_id, caches) in self.inner.by_room.read().await.iter() {
316            let room_utds: Vec<_> = caches
317                .all_in_memory_events()
318                .await
319                .into_iter()
320                .flatten()
321                .filter_map(filter_timeline_event_to_decrypted)
322                .collect();
323
324            decrypted_events.insert(room_id.to_owned(), room_utds);
325        }
326
327        decrypted_events
328    }
329
330    /// Handle a chunk of events that we were previously unable to decrypt but
331    /// have now successfully decrypted.
332    ///
333    /// This function will replace the existing UTD events in memory and the
334    /// store and send out a [`RoomEventCacheUpdate`] for the newly
335    /// decrypted events.
336    ///
337    /// # Arguments
338    ///
339    /// * `room_id` - The ID of the room where the events were sent to.
340    /// * `events` - A chunk of events that were successfully decrypted.
341    #[instrument(skip_all, fields(room_id))]
342    async fn on_resolved_utds(
343        &self,
344        room_id: &RoomId,
345        events: Vec<ResolvedUtd>,
346    ) -> Result<(), EventCacheError> {
347        if events.is_empty() {
348            trace!("No events were redecrypted or updated, nothing to replace");
349            return Ok(());
350        }
351
352        timer!("Resolving UTDs");
353
354        let event_ids: BTreeSet<_> =
355            events.iter().cloned().map(|(event_id, _, _)| event_id).collect();
356
357        let all_caches = self.inner.all_caches_for_room(room_id).await?;
358
359        // Resolve on the room cache.
360        {
361            let room_cache = &all_caches.room;
362            let mut state = room_cache.state().write().await?;
363
364            let mut new_events = Vec::with_capacity(events.len());
365
366            for (event_id, decrypted, actions) in &events {
367                if let Some((location, mut target_event)) = state.find_event(event_id).await?
368                    && (
369                        // There is a race between the multiple sources of updates. It's possible
370                        // that two sources trigger a decryption for the same event (for example,
371                        // the room key stream and the event cache updates). It is then likely that
372                        // the event has been already resolved. This race is fine, but we should
373                        // avoid to replace an event that has already been resolved as it is a
374                        // non-negligible operation.
375                        //
376                        // Note that a simple check like “event's kind is `UnableToDecrypt`” is not
377                        // enough. The event can already be decrypted but its encryption info can
378                        // change. So we must ensure they are also different.
379                        matches!(target_event.kind, TimelineEventKind::UnableToDecrypt { .. })
380                            || target_event.encryption_info() != Some(&decrypted.encryption_info)
381                    )
382                {
383                    target_event.kind = TimelineEventKind::Decrypted(decrypted.clone());
384
385                    if let Some(actions) = actions {
386                        target_event.set_push_actions(actions.clone());
387                    }
388
389                    // TODO: `replace_event_at()` propagates changes to the store for every
390                    // event, we should probably have a bulk version of this?
391                    state.replace_event_at(location, target_event.clone()).await?;
392                    new_events.push(target_event);
393                }
394            }
395
396            // Read receipt events aren't encrypted, so we can't have decrypted a new
397            // one here. As a result, we don't have any new receipt events to
398            // post-process, so we can just pass `None` here.
399            //
400            // Note: read receipts may be updated anyhow in the post-processing step,
401            // as the redecryption may have decrypted some events that don't count as
402            // unreads.
403            let receipt_event = None;
404
405            state.post_process_new_events(new_events, receipt_event).await?;
406
407            let updates_as_vector_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
408
409            if !updates_as_vector_diffs.is_empty() {
410                room_cache.update_sender().send(
411                    RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
412                        diffs: updates_as_vector_diffs,
413                        origin: EventsOrigin::Cache,
414                    }),
415                    Some(RoomEventCacheGenericUpdate { room_id: room_id.to_owned() }),
416                );
417            }
418        }
419
420        // Resolve on the thread caches.
421        {
422            // TODO: This ain't great for performance; there shouldn't be
423            // that many thread caches alive at the same time, but they could
424            // accumulate over time. Consider keeping track of which linked
425            // chunk contains which event ID, to avoid doing the linear searches
426            // here.
427
428            // Replaces UTDs in each thread, and maybe update the thread summary.
429            for (thread_id, thread_cache) in try_join_all(
430                all_caches.threads.read().await.iter().map(|(thread_id, thread_cache)| async {
431                    Result::<_, EventCacheError>::Ok(
432                        // If at least one event has been replaced, return the `thread_id` and the
433                        // `thread_cache` to update the thread summary later.
434                        thread_cache
435                            .replace_utds(&events)
436                            .await?
437                            .then(|| (thread_id.clone(), thread_cache.clone())),
438                    )
439                }),
440            )
441            .await?
442            .into_iter()
443            // Filter out results that are `None`, i.e. a thread where no UTD has been replaced.
444            .flatten()
445            {
446                let new_thread_summary =
447                    thread_cache.state().read().await?.compute_thread_summary().await?;
448
449                all_caches.room.update_thread_summary(&thread_id, new_thread_summary).await?;
450            }
451        }
452
453        // Resolve on the pinned-events cache.
454        if let Some(pinned_events_cache) = all_caches.pinned_events.get() {
455            pinned_events_cache.replace_utds(&events).await?;
456        }
457
458        // Resolve on the event-focused caches.
459        {
460            // TODO: This ain't great for performance; there shouldn't be that many
461            // event-focused caches alive at the same time, but they could
462            // accumulate over time. Consider keeping track of which linked chunk
463            // contains which event ID, to avoid doing the linear searches here.
464            try_join_all(
465                all_caches
466                    .event_focused
467                    .read()
468                    .await
469                    .values()
470                    .map(|event_focused_cache| event_focused_cache.replace_utds(&events)),
471            )
472            .await?;
473        }
474
475        let report =
476            RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids };
477        let _ = self.inner.redecryption_channels.utd_reporter.send(report);
478
479        Ok(())
480    }
481
482    /// Attempt to decrypt a single event.
483    async fn decrypt_event(
484        &self,
485        room_id: &RoomId,
486        room: Option<&Room>,
487        push_context: Option<&PushContext>,
488        event: &Raw<EncryptedEvent>,
489    ) -> Option<(DecryptedRoomEvent, Option<Vec<Action>>)> {
490        if let Some(room) = room {
491            match room
492                .decrypt_event(
493                    event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
494                    push_context,
495                )
496                .await
497            {
498                Ok(maybe_decrypted) => {
499                    let actions = maybe_decrypted.push_actions().map(|a| a.to_vec());
500
501                    if let TimelineEventKind::Decrypted(decrypted) = maybe_decrypted.kind {
502                        Some((decrypted, actions))
503                    } else {
504                        warn!(
505                            "Failed to redecrypt an event despite receiving a room key or request to redecrypt"
506                        );
507                        None
508                    }
509                }
510                Err(e) => {
511                    warn!(
512                        "Failed to redecrypt an event despite receiving a room key or request to redecrypt {e:?}"
513                    );
514                    None
515                }
516            }
517        } else {
518            let client = self.inner.client().ok()?;
519            let machine = client.olm_machine().await;
520            let machine = machine.as_ref()?;
521
522            match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await {
523                Ok(decrypted) => Some((decrypted, None)),
524                Err(e) => {
525                    warn!(
526                        "Failed to redecrypt an event despite receiving a room key or a request to redecrypt {e:?}"
527                    );
528                    None
529                }
530            }
531        }
532    }
533
534    /// Attempt to redecrypt events after a room key with the given session ID
535    /// has been received.
536    #[instrument(skip_all, fields(room_id, session_id))]
537    async fn retry_decryption(
538        &self,
539        room_id: &RoomId,
540        session_id: SessionId<'_>,
541    ) -> Result<(), EventCacheError> {
542        // Get all the relevant UTDs.
543        let events = self.all_encrypted_events(room_id, session_id).await?;
544        self.retry_decryption_for_events(room_id, events).await
545    }
546
547    /// Attempt to redecrypt events that were persisted in the event cache.
548    #[instrument(skip_all, fields(updates.linked_chunk_id))]
549    async fn retry_decryption_for_event_cache_updates(
550        &self,
551        updates: RoomEventCacheLinkedChunkUpdate,
552    ) -> Result<(), EventCacheError> {
553        let room_id = updates.linked_chunk_id.room_id();
554        let events: Vec<_> = updates
555            .updates
556            .into_iter()
557            .flat_map(|updates| updates.into_items())
558            .filter_map(filter_timeline_event_to_utd)
559            .collect();
560
561        self.retry_decryption_for_events(room_id, events).await
562    }
563
564    async fn retry_decryption_for_in_memory_events(&self) {
565        let utds = self.all_in_memory_encrypted_events().await;
566
567        for (room_id, utds) in utds.into_iter() {
568            if let Err(e) = self.retry_decryption_for_events(&room_id, utds).await {
569                warn!(%room_id, "Failed to redecrypt in-memory events {e:?}");
570            }
571        }
572    }
573
574    /// Attempt to redecrypt a chunk of UTDs.
575    #[instrument(skip_all, fields(room_id, session_id))]
576    async fn retry_decryption_for_events(
577        &self,
578        room_id: &RoomId,
579        events: Vec<EventIdAndUtd>,
580    ) -> Result<(), EventCacheError> {
581        trace!("Retrying to decrypt");
582
583        if events.is_empty() {
584            trace!("No relevant events found.");
585            return Ok(());
586        }
587
588        let room = self.inner.client().ok().and_then(|client| client.get_room(room_id));
589        let push_context =
590            if let Some(room) = &room { room.push_context().await.ok().flatten() } else { None };
591
592        // Let's attempt to decrypt them them.
593        let mut decrypted_events = Vec::with_capacity(events.len());
594
595        for (event_id, event) in events {
596            // If we managed to decrypt the event, and we should have to since we received
597            // the room key for this specific event, then replace the event.
598            if let Some((decrypted, actions)) = self
599                .decrypt_event(
600                    room_id,
601                    room.as_ref(),
602                    push_context.as_ref(),
603                    event.cast_ref_unchecked(),
604                )
605                .await
606            {
607                decrypted_events.push((event_id, decrypted, actions));
608            }
609        }
610
611        let event_ids: BTreeSet<_> =
612            decrypted_events.iter().map(|(event_id, _, _)| event_id).collect();
613
614        if !event_ids.is_empty() {
615            trace!(?event_ids, "Successfully redecrypted events");
616        }
617
618        // Replace the events and notify listeners that UTDs have been replaced with
619        // decrypted events.
620        self.on_resolved_utds(room_id, decrypted_events).await?;
621
622        Ok(())
623    }
624
625    /// Attempt to update the encryption info for the given list of events.
626    async fn update_encryption_info_for_events(
627        &self,
628        room: &Room,
629        events: Vec<EventIdAndEvent>,
630    ) -> Result<(), EventCacheError> {
631        // Let's attempt to update their encryption info.
632        let mut updated_events = Vec::with_capacity(events.len());
633
634        for (event_id, mut event) in events {
635            if let Some(session_id) = event.encryption_info.session_id() {
636                let new_encryption_info =
637                    room.get_encryption_info(session_id, &event.encryption_info.sender).await;
638
639                // Only create a replacement if the encryption info actually changed.
640                if let Some(new_encryption_info) = new_encryption_info
641                    && event.encryption_info != new_encryption_info
642                {
643                    event.encryption_info = new_encryption_info;
644                    updated_events.push((event_id, event, None));
645                }
646            }
647        }
648
649        let event_ids: BTreeSet<_> =
650            updated_events.iter().map(|(event_id, _, _)| event_id).collect();
651
652        if !event_ids.is_empty() {
653            trace!(?event_ids, "Replacing the encryption info of some events");
654        }
655
656        self.on_resolved_utds(room.room_id(), updated_events).await
657    }
658
659    #[instrument(skip_all, fields(room_id, session_id))]
660    async fn update_encryption_info(
661        &self,
662        room_id: &RoomId,
663        session_id: SessionId<'_>,
664    ) -> Result<(), EventCacheError> {
665        trace!("Updating encryption info");
666
667        let Ok(client) = self.inner.client() else {
668            return Ok(());
669        };
670
671        let Some(room) = client.get_room(room_id) else {
672            return Ok(());
673        };
674
675        // Get all the relevant events.
676        let events = self.all_decrypted_events(room_id, session_id).await?;
677
678        if events.is_empty() {
679            trace!("No relevant events found.");
680            return Ok(());
681        }
682
683        // Let's attempt to update their encryption info.
684        self.update_encryption_info_for_events(&room, events).await
685    }
686
687    async fn retry_update_encryption_info_for_in_memory_events(&self) {
688        let decrypted_events = self.all_in_memory_decrypted_events().await;
689
690        for (room_id, events) in decrypted_events.into_iter() {
691            let Some(room) = self.inner.client().ok().and_then(|c| c.get_room(&room_id)) else {
692                continue;
693            };
694
695            if let Err(e) = self.update_encryption_info_for_events(&room, events).await {
696                warn!(
697                    %room_id,
698                    "Failed to replace the encryption info for in-memory events {e:?}"
699                );
700            }
701        }
702    }
703
704    /// Retry to decrypt and update the encryption info of all the events
705    /// contained in the memory part of the event cache.
706    ///
707    /// This list of events will map one-to-one to the events components
708    /// subscribed to the event cache are have received and are keeping cached.
709    ///
710    /// If components subscribed to the event cache are doing additional
711    /// caching, they'll need to listen to [`RedecryptorReport`]s and
712    /// explicitly request redecryption attempts using
713    /// [`EventCache::request_decryption`].
714    async fn retry_in_memory_events(&self) {
715        self.retry_decryption_for_in_memory_events().await;
716        self.retry_update_encryption_info_for_in_memory_events().await;
717    }
718
719    /// Explicitly request the redecryption of a set of events.
720    ///
721    /// The redecryption logic in the event cache might sometimes miss that a
722    /// room key has become available and that a certain set of events has
723    /// become decryptable.
724    ///
725    /// This might happen because some room keys might arrive in a separate
726    /// process handling push notifications or if a room key arrives but the
727    /// process shuts down before we could have decrypted the events.
728    ///
729    /// For this reason it is useful to tell the event cache explicitly that
730    /// some events should be retried to be redecrypted.
731    ///
732    /// This method allows you to do so. The events that get decrypted, if any,
733    /// will be advertised over the usual event cache subscription mechanism
734    /// which can be accessed using the [`RoomEventCache::subscribe()`]
735    /// method.
736    ///
737    /// # Examples
738    ///
739    /// ```no_run
740    /// # use matrix_sdk::{Client, event_cache::DecryptionRetryRequest};
741    /// # use url::Url;
742    /// # use ruma::owned_room_id;
743    /// # use std::collections::BTreeSet;
744    /// # async {
745    /// # let homeserver = Url::parse("http://localhost:8080")?;
746    /// # let client = Client::new(homeserver).await?;
747    /// let event_cache = client.event_cache();
748    /// let room_id = owned_room_id!("!my_room:localhost");
749    ///
750    /// let request = DecryptionRetryRequest {
751    ///     room_id,
752    ///     utd_session_ids: BTreeSet::from(["session_id".into()]),
753    ///     refresh_info_session_ids: BTreeSet::new(),
754    /// };
755    ///
756    /// event_cache.request_decryption(request);
757    /// # anyhow::Ok(()) };
758    /// ```
759    pub fn request_decryption(&self, request: DecryptionRetryRequest) {
760        let _ =
761            self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err(
762                |_| warn!("Requesting a decryption while the redecryption task has been shut down"),
763            );
764    }
765
766    /// Subscribe to reports that the redecryptor generates.
767    ///
768    /// The redecryption logic in the event cache might sometimes miss that a
769    /// room key has become available and that a certain set of events has
770    /// become decryptable.
771    ///
772    /// This might happen because some room keys might arrive in a separate
773    /// process handling push notifications or if room keys arrive faster than
774    /// we can handle them.
775    ///
776    /// This stream can be used to get notified about such situations as well as
777    /// a general channel where the event cache reports which events got
778    /// successfully redecrypted.
779    ///
780    /// # Examples
781    ///
782    /// ```no_run
783    /// # use matrix_sdk::{Client, event_cache::RedecryptorReport};
784    /// # use url::Url;
785    /// # use tokio_stream::StreamExt;
786    /// # async {
787    /// # let homeserver = Url::parse("http://localhost:8080")?;
788    /// # let client = Client::new(homeserver).await?;
789    /// let event_cache = client.event_cache();
790    ///
791    /// let mut stream = event_cache.subscribe_to_decryption_reports();
792    ///
793    /// while let Some(Ok(report)) = stream.next().await {
794    ///     match report {
795    ///         RedecryptorReport::Lagging => {
796    ///             // The event cache might have missed to redecrypt some events. We should tell
797    ///             // it which events we care about, i.e. which events we're displaying to the
798    ///             // user, and let it redecrypt things with an explicit request.
799    ///         }
800    ///         RedecryptorReport::BackupAvailable => {
801    ///             // A backup has become available. We can, just like in the Lagging case, tell
802    ///             // the event cache to attempt to redecrypt some events.
803    ///             //
804    ///             // This is only necessary with the BackupDownloadStrategy::OnDecryptionFailure
805    ///             // as the decryption attempt in this case will trigger the download of the
806    ///             // room key from the backup.
807    ///         }
808    ///         RedecryptorReport::ResolvedUtds { .. } => {
809    ///             // This may be interesting for statistical reasons or in case we'd like to
810    ///             // fetch and inspect these events in some manner.
811    ///         }
812    ///     }
813    /// }
814    /// # anyhow::Ok(()) };
815    /// ```
816    pub fn subscribe_to_decryption_reports(
817        &self,
818    ) -> impl Stream<Item = Result<RedecryptorReport, BroadcastStreamRecvError>> {
819        BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe())
820    }
821}
822
823#[inline(always)]
824fn upgrade_event_cache(cache: &Weak<EventCacheInner>) -> Option<EventCache> {
825    cache.upgrade().map(|inner| EventCache { inner })
826}
827
828async fn send_report_and_retry_memory_events(
829    cache: &Weak<EventCacheInner>,
830    report: RedecryptorReport,
831) -> Result<(), ()> {
832    let Some(cache) = upgrade_event_cache(cache) else {
833        return Err(());
834    };
835
836    cache.retry_in_memory_events().await;
837    let _ = cache.inner.redecryption_channels.utd_reporter.send(report);
838
839    Ok(())
840}
841
842/// Struct holding on to the redecryption task.
843///
844/// This struct implements the bulk of the redecryption task. It listens to the
845/// various streams that should trigger redecryption attempts.
846///
847/// For more info see the [module level docs](self).
848pub(crate) struct Redecryptor {
849    _task: BackgroundTaskHandle,
850}
851
852impl Redecryptor {
853    /// Create a new [`Redecryptor`].
854    ///
855    /// This creates a task that listens to various streams and attempts to
856    /// redecrypt UTDs that can be found inside the [`EventCache`].
857    pub(super) fn new(
858        client: &Client,
859        cache: Weak<EventCacheInner>,
860        receiver: UnboundedReceiver<DecryptionRetryRequest>,
861        linked_chunk_update_sender: &Sender<RoomEventCacheLinkedChunkUpdate>,
862    ) -> Self {
863        let linked_chunk_stream = BroadcastStream::new(linked_chunk_update_sender.subscribe());
864        let backup_state_stream = client.encryption().backups().state_stream();
865
866        let task = client
867            .task_monitor()
868            .spawn_infinite_task("event_cache::redecryptor", async {
869                let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
870
871                Self::listen_for_room_keys_task(
872                    cache,
873                    request_redecryption_stream,
874                    linked_chunk_stream,
875                    backup_state_stream,
876                )
877                .await;
878            })
879            .abort_on_drop();
880
881        Self { _task: task }
882    }
883
884    /// (Re)-subscribe to the room key stream from the [`OlmMachine`].
885    ///
886    /// This needs to happen any time this stream returns a `None` meaning that
887    /// the sending part of the stream has been dropped.
888    async fn subscribe_to_room_key_stream(
889        cache: &Weak<EventCacheInner>,
890    ) -> Option<(
891        impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>,
892        impl Stream<Item = Vec<RoomKeyWithheldInfo>>,
893    )> {
894        let event_cache = cache.upgrade()?;
895        let client = event_cache.client().ok()?;
896        let machine = client.olm_machine().await;
897
898        machine.as_ref().map(|m| {
899            (m.store().room_keys_received_stream(), m.store().room_keys_withheld_received_stream())
900        })
901    }
902
903    async fn redecryption_loop(
904        cache: &Weak<EventCacheInner>,
905        decryption_request_stream: &mut Pin<&mut impl Stream<Item = DecryptionRetryRequest>>,
906        events_stream: &mut Pin<
907            &mut impl Stream<Item = Result<RoomEventCacheLinkedChunkUpdate, BroadcastStreamRecvError>>,
908        >,
909        backup_state_stream: &mut Pin<
910            &mut impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
911        >,
912    ) -> bool {
913        let Some((room_key_stream, withheld_stream)) =
914            Self::subscribe_to_room_key_stream(cache).await
915        else {
916            return false;
917        };
918
919        pin_mut!(room_key_stream);
920        pin_mut!(withheld_stream);
921
922        loop {
923            tokio::select! {
924                // An explicit request, presumably from the timeline, has been received to decrypt
925                // events that were encrypted with a certain room key.
926                Some(request) = decryption_request_stream.next() => {
927                        let Some(cache) = upgrade_event_cache(cache) else {
928                            break false;
929                        };
930
931                        trace!(?request, "Received a redecryption request");
932
933                        for session_id in request.utd_session_ids {
934                            let _ = cache
935                                .retry_decryption(&request.room_id, &session_id)
936                                .await
937                                .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}"));
938                        }
939
940                        for session_id in request.refresh_info_session_ids {
941                            let _ = cache.update_encryption_info(&request.room_id, &session_id).await.inspect_err(|e|
942                                warn!(
943                                    room_id = %request.room_id,
944                                    session_id = session_id,
945                                    "Unable to update the encryption info {e:?}",
946                            ));
947                        }
948                }
949                // The room key stream from the OlmMachine. Needs to be recreated every time we
950                // receive a `None` from the stream.
951                room_keys = room_key_stream.next() => {
952                    match room_keys {
953                        Some(Ok(room_keys)) => {
954                            // Alright, some room keys were received and persisted in our store,
955                            // let's attempt to redecrypt events that were encrypted using these
956                            // room keys.
957                            let Some(cache) = upgrade_event_cache(cache) else {
958                                break false;
959                            };
960
961                            trace!(?room_keys, "Received new room keys");
962
963                            for key in &room_keys {
964                                let _ = cache
965                                    .retry_decryption(&key.room_id, &key.session_id)
966                                    .await
967                                    .inspect_err(|e| warn!("Error redecrypting {e:?}"));
968                            }
969
970                            for key in room_keys {
971                                let _ = cache.update_encryption_info(&key.room_id, &key.session_id).await.inspect_err(|e|
972                                    warn!(
973                                        room_id = %key.room_id,
974                                        session_id = key.session_id,
975                                        "Unable to update the encryption info {e:?}",
976                                ));
977                            }
978                        },
979                        Some(Err(_)) => {
980                            // We missed some room keys, we need to report this in case a listener
981                            // has and idea which UTDs we should attempt to redecrypt.
982                            //
983                            // This would most likely be the timeline from the UI crate. The
984                            // timeline might attempt to redecrypt all UTDs it is showing to the
985                            // user.
986                            warn!("The room key stream lagged, reporting the lag to our listeners");
987
988                            if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
989                                break false;
990                            }
991                        },
992                        // The stream got closed, this could mean that our OlmMachine got
993                        // regenerated, let's return true and try to recreate the stream.
994                        None => {
995                            break true;
996                        }
997                    }
998                }
999                withheld_info = withheld_stream.next() => {
1000                    match withheld_info {
1001                        Some(infos) => {
1002                            let Some(cache) = upgrade_event_cache(cache) else {
1003                                break false;
1004                            };
1005
1006                            trace!(?infos, "Received new withheld infos");
1007
1008                            for RoomKeyWithheldInfo { room_id, session_id, .. } in &infos {
1009                                let _ = cache.update_encryption_info(room_id, session_id).await.inspect_err(|e|
1010                                    warn!(
1011                                        room_id = %room_id,
1012                                        session_id = session_id,
1013                                        "Unable to update the encryption info {e:?}",
1014                                ));
1015                            }
1016                        }
1017                        // The stream got closed, same as for the room key stream, we'll try to
1018                        // recreate the streams.
1019                        None => break true,
1020                    }
1021                }
1022                // Events that the event cache handled. If the event cache received any UTDs, let's
1023                // attempt to redecrypt them in case the room key was received before the event
1024                // cache was able to return them using `get_utds()`.
1025                Some(event_updates) = events_stream.next() => {
1026                    match event_updates {
1027                        Ok(updates) => {
1028                            let Some(cache) = upgrade_event_cache(cache) else {
1029                                break false;
1030                            };
1031
1032                            let linked_chunk_id = updates.linked_chunk_id.to_owned();
1033
1034                            let _ = cache.retry_decryption_for_event_cache_updates(updates).await.inspect_err(|e|
1035                                warn!(
1036                                    %linked_chunk_id,
1037                                    "Unable to handle UTDs from event cache updates {e:?}",
1038                                )
1039                            );
1040                        }
1041                        Err(_) => {
1042                            if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1043                                break false;
1044                            }
1045                        }
1046                    }
1047                }
1048                Some(backup_state_update) = backup_state_stream.next() => {
1049                    match backup_state_update {
1050                        Ok(state) => {
1051                            match state {
1052                                BackupState::Unknown |
1053                                BackupState::Creating |
1054                                BackupState::Enabling |
1055                                BackupState::Resuming |
1056                                BackupState::Downloading |
1057                                BackupState::Disabling =>{
1058                                    // Those states aren't particularly interesting to components
1059                                    // listening to R2D2 reports.
1060                                }
1061                                BackupState::Enabled => {
1062                                    // Alright, the backup got enabled, we might or might not have
1063                                    // downloaded the room keys from the backup. In case they get
1064                                    // downloaded on-demand, let's try to decrypt all the events we
1065                                    // have cached in-memory.
1066                                    if send_report_and_retry_memory_events(cache, RedecryptorReport::BackupAvailable).await.is_err() {
1067                                        break false;
1068                                    }
1069                                }
1070                            }
1071                        }
1072                        Err(_) => {
1073                            if send_report_and_retry_memory_events(cache, RedecryptorReport::Lagging).await.is_err() {
1074                                break false;
1075                            }
1076                        }
1077                    }
1078                }
1079                else => break false,
1080            }
1081        }
1082    }
1083
1084    async fn listen_for_room_keys_task(
1085        cache: Weak<EventCacheInner>,
1086        decryption_request_stream: UnboundedReceiverStream<DecryptionRetryRequest>,
1087        events_stream: BroadcastStream<RoomEventCacheLinkedChunkUpdate>,
1088        backup_state_stream: impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
1089    ) {
1090        // We pin the decryption request stream here since that one doesn't need to be
1091        // recreated and we don't want to miss messages coming from the stream
1092        // while recreating it unnecessarily.
1093        pin_mut!(decryption_request_stream);
1094        pin_mut!(events_stream);
1095        pin_mut!(backup_state_stream);
1096
1097        while Self::redecryption_loop(
1098            &cache,
1099            &mut decryption_request_stream,
1100            &mut events_stream,
1101            &mut backup_state_stream,
1102        )
1103        .await
1104        {
1105            info!("Regenerating the re-decryption streams");
1106
1107            // Report that the stream got recreated so listeners know about it, at the same
1108            // time retry to decrypt anything we have cached in memory.
1109            if send_report_and_retry_memory_events(&cache, RedecryptorReport::Lagging)
1110                .await
1111                .is_err()
1112            {
1113                break;
1114            }
1115        }
1116
1117        info!("Shutting down the event cache redecryptor");
1118    }
1119}
1120
1121#[cfg(not(target_family = "wasm"))]
1122#[cfg(test)]
1123mod tests {
1124    use std::{
1125        collections::BTreeSet,
1126        sync::{
1127            Arc,
1128            atomic::{AtomicBool, Ordering},
1129        },
1130        time::Duration,
1131    };
1132
1133    use assert_matches2::assert_matches;
1134    use async_trait::async_trait;
1135    use eyeball_im::VectorDiff;
1136    use matrix_sdk_base::{
1137        cross_process_lock::CrossProcessLockGeneration,
1138        crypto::types::events::{ToDeviceEvent, room::encrypted::ToDeviceEncryptedEventContent},
1139        deserialized_responses::{TimelineEventKind, VerificationState},
1140        event_cache::{
1141            Event, Gap,
1142            store::{EventCacheStore, EventCacheStoreError, MemoryStore},
1143        },
1144        linked_chunk::{
1145            ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
1146            RawChunk, Update,
1147        },
1148        locks::Mutex,
1149        sleep::sleep,
1150        store::StoreConfig,
1151    };
1152    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
1153    use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
1154    use ruma::{
1155        EventId, OwnedEventId, RoomId, RoomVersionId, device_id, event_id,
1156        events::{AnySyncTimelineEvent, relation::RelationType},
1157        room_id,
1158        serde::Raw,
1159        user_id,
1160    };
1161    use serde_json::json;
1162    use tokio::sync::oneshot::{self, Sender};
1163    use tracing::{Instrument, info};
1164
1165    use crate::{
1166        Client, assert_let_timeout,
1167        encryption::EncryptionSettings,
1168        event_cache::{
1169            DecryptionRetryRequest, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
1170            TimelineVectorDiffs,
1171        },
1172        test_utils::mocks::MatrixMockServer,
1173    };
1174
1175    /// A wrapper for the memory store for the event cache.
1176    ///
1177    /// Delays the persisting of events, or linked chunk updates, to allow the
1178    /// testing of race conditions between the event cache and R2D2.
1179    #[derive(Debug, Clone)]
1180    struct DelayingStore {
1181        memory_store: MemoryStore,
1182        delaying: Arc<AtomicBool>,
1183        foo: Arc<Mutex<Option<Sender<()>>>>,
1184    }
1185
1186    impl DelayingStore {
1187        fn new() -> Self {
1188            Self {
1189                memory_store: MemoryStore::new(),
1190                delaying: AtomicBool::new(true).into(),
1191                foo: Arc::new(Mutex::new(None)),
1192            }
1193        }
1194
1195        async fn stop_delaying(&self) {
1196            let (sender, receiver) = oneshot::channel();
1197
1198            {
1199                *self.foo.lock() = Some(sender);
1200            }
1201
1202            self.delaying.store(false, Ordering::SeqCst);
1203
1204            receiver.await.expect("We should be able to receive a response")
1205        }
1206    }
1207
1208    #[cfg_attr(target_family = "wasm", async_trait(?Send))]
1209    #[cfg_attr(not(target_family = "wasm"), async_trait)]
1210    impl EventCacheStore for DelayingStore {
1211        type Error = EventCacheStoreError;
1212
1213        async fn close(&self) -> Result<(), EventCacheStoreError> {
1214            self.memory_store.close().await
1215        }
1216
1217        async fn reopen(&self) -> Result<(), EventCacheStoreError> {
1218            self.memory_store.reopen().await
1219        }
1220
1221        async fn try_take_leased_lock(
1222            &self,
1223            lease_duration_ms: u32,
1224            key: &str,
1225            holder: &str,
1226        ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
1227            self.memory_store.try_take_leased_lock(lease_duration_ms, key, holder).await
1228        }
1229
1230        async fn handle_linked_chunk_updates(
1231            &self,
1232            linked_chunk_id: LinkedChunkId<'_>,
1233            updates: Vec<Update<Event, Gap>>,
1234        ) -> Result<(), Self::Error> {
1235            // This is the key behaviour of this store - we wait to set this value until
1236            // someone calls `stop_delaying`.
1237            //
1238            // We use `sleep` here for simplicity. The cool way would be to use a custom
1239            // waker or something like that.
1240            while self.delaying.load(Ordering::SeqCst) {
1241                sleep(Duration::from_millis(10)).await;
1242            }
1243
1244            let sender = self.foo.lock().take();
1245            let ret = self.memory_store.handle_linked_chunk_updates(linked_chunk_id, updates).await;
1246
1247            if let Some(sender) = sender {
1248                sender.send(()).expect("We should be able to notify the other side that we're done with the storage operation");
1249            }
1250
1251            ret
1252        }
1253
1254        async fn load_all_chunks(
1255            &self,
1256            linked_chunk_id: LinkedChunkId<'_>,
1257        ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
1258            self.memory_store.load_all_chunks(linked_chunk_id).await
1259        }
1260
1261        async fn load_all_chunks_metadata(
1262            &self,
1263            linked_chunk_id: LinkedChunkId<'_>,
1264        ) -> Result<Vec<ChunkMetadata>, Self::Error> {
1265            self.memory_store.load_all_chunks_metadata(linked_chunk_id).await
1266        }
1267
1268        async fn load_last_chunk(
1269            &self,
1270            linked_chunk_id: LinkedChunkId<'_>,
1271        ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1272            self.memory_store.load_last_chunk(linked_chunk_id).await
1273        }
1274
1275        async fn load_previous_chunk(
1276            &self,
1277            linked_chunk_id: LinkedChunkId<'_>,
1278            before_chunk_identifier: ChunkIdentifier,
1279        ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1280            self.memory_store.load_previous_chunk(linked_chunk_id, before_chunk_identifier).await
1281        }
1282
1283        async fn clear_all_events(&self) -> Result<(), Self::Error> {
1284            self.memory_store.clear_all_events().await
1285        }
1286
1287        async fn filter_duplicated_events(
1288            &self,
1289            linked_chunk_id: LinkedChunkId<'_>,
1290            events: Vec<OwnedEventId>,
1291        ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1292            self.memory_store.filter_duplicated_events(linked_chunk_id, events).await
1293        }
1294
1295        async fn find_event(
1296            &self,
1297            room_id: &RoomId,
1298            event_id: &EventId,
1299        ) -> Result<Option<Event>, Self::Error> {
1300            self.memory_store.find_event(room_id, event_id).await
1301        }
1302
1303        async fn find_event_relations(
1304            &self,
1305            room_id: &RoomId,
1306            event_id: &EventId,
1307            filters: Option<&[RelationType]>,
1308        ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1309            self.memory_store.find_event_relations(room_id, event_id, filters).await
1310        }
1311
1312        async fn get_room_events(
1313            &self,
1314            room_id: &RoomId,
1315            event_type: Option<&str>,
1316            session_id: Option<&str>,
1317        ) -> Result<Vec<Event>, Self::Error> {
1318            self.memory_store.get_room_events(room_id, event_type, session_id).await
1319        }
1320
1321        async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1322            self.memory_store.save_event(room_id, event).await
1323        }
1324
1325        async fn optimize(&self) -> Result<(), Self::Error> {
1326            self.memory_store.optimize().await
1327        }
1328
1329        async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1330            self.memory_store.get_size().await
1331        }
1332    }
1333
1334    async fn set_up_clients(
1335        room_id: &RoomId,
1336        alice_enables_cross_signing: bool,
1337        use_delayed_store: bool,
1338    ) -> (Client, Client, MatrixMockServer, Option<DelayingStore>) {
1339        let alice_span = tracing::info_span!("alice");
1340        let bob_span = tracing::info_span!("bob");
1341
1342        let alice_user_id = user_id!("@alice:localhost");
1343        let alice_device_id = device_id!("ALICEDEVICE");
1344        let bob_user_id = user_id!("@bob:localhost");
1345        let bob_device_id = device_id!("BOBDEVICE");
1346
1347        let matrix_mock_server = MatrixMockServer::new().await;
1348        matrix_mock_server.mock_crypto_endpoints_preset().await;
1349
1350        let encryption_settings = EncryptionSettings {
1351            auto_enable_cross_signing: alice_enables_cross_signing,
1352            ..Default::default()
1353        };
1354
1355        // Create some clients for Alice and Bob.
1356
1357        let alice = matrix_mock_server
1358            .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id)
1359            .on_builder(|builder| {
1360                builder
1361                    .with_enable_share_history_on_invite(true)
1362                    .with_encryption_settings(encryption_settings)
1363            })
1364            .build()
1365            .instrument(alice_span.clone())
1366            .await;
1367
1368        let encryption_settings =
1369            EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() };
1370
1371        let (store_config, store) = if use_delayed_store {
1372            let store = DelayingStore::new();
1373
1374            (
1375                StoreConfig::new(CrossProcessLockConfig::multi_process(
1376                    "delayed_store_event_cache_test",
1377                ))
1378                .event_cache_store(store.clone()),
1379                Some(store),
1380            )
1381        } else {
1382            (
1383                StoreConfig::new(CrossProcessLockConfig::multi_process(
1384                    "normal_store_event_cache_test",
1385                )),
1386                None,
1387            )
1388        };
1389
1390        let bob = matrix_mock_server
1391            .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id)
1392            .on_builder(|builder| {
1393                builder
1394                    .with_enable_share_history_on_invite(true)
1395                    .with_encryption_settings(encryption_settings)
1396                    .store_config(store_config)
1397            })
1398            .build()
1399            .instrument(bob_span.clone())
1400            .await;
1401
1402        bob.event_cache().subscribe().expect("Bob should be able to enable the event cache");
1403
1404        // Ensure that Alice and Bob are aware of their devices and identities.
1405        matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await;
1406
1407        let event_factory = EventFactory::new().room(room_id).sender(alice_user_id);
1408
1409        // Let us now create a room for them.
1410        let room_builder = JoinedRoomBuilder::new(room_id)
1411            .add_state_event(event_factory.create(alice_user_id, RoomVersionId::V1))
1412            .add_state_event(event_factory.room_encryption());
1413
1414        matrix_mock_server
1415            .mock_sync()
1416            .ok_and_run(&alice, |builder| {
1417                builder.add_joined_room(room_builder.clone());
1418            })
1419            .instrument(alice_span)
1420            .await;
1421
1422        matrix_mock_server
1423            .mock_sync()
1424            .ok_and_run(&bob, |builder| {
1425                builder.add_joined_room(room_builder);
1426            })
1427            .instrument(bob_span)
1428            .await;
1429
1430        (alice, bob, matrix_mock_server, store)
1431    }
1432
1433    async fn prepare_room(
1434        matrix_mock_server: &MatrixMockServer,
1435        event_factory: &EventFactory,
1436        alice: &Client,
1437        bob: &Client,
1438        room_id: &RoomId,
1439    ) -> (Raw<AnySyncTimelineEvent>, Raw<ToDeviceEvent<ToDeviceEncryptedEventContent>>) {
1440        let alice_user_id = alice.user_id().unwrap();
1441        let bob_user_id = bob.user_id().unwrap();
1442
1443        let alice_member_event = event_factory.member(alice_user_id).into_raw();
1444        let bob_member_event = event_factory.member(bob_user_id).into_raw();
1445
1446        let room = alice
1447            .get_room(room_id)
1448            .expect("Alice should have access to the room now that we synced");
1449
1450        // Alice will send a single event to the room, but this will trigger a to-device
1451        // message containing the room key to be sent as well. We capture both the event
1452        // and the to-device message.
1453
1454        let event_type = "m.room.message";
1455        let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"});
1456
1457        let event_id = event_id!("$some_id");
1458        let (event_receiver, mock) =
1459            matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id);
1460        let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await;
1461
1462        {
1463            let _guard = mock.mock_once().mount_as_scoped().await;
1464
1465            matrix_mock_server
1466                .mock_get_members()
1467                .ok(vec![alice_member_event.clone(), bob_member_event.clone()])
1468                .mock_once()
1469                .mount()
1470                .await;
1471
1472            room.send_raw(event_type, content)
1473                .await
1474                .expect("We should be able to send an initial message");
1475        };
1476
1477        // Let us retrieve the captured event and to-device message.
1478        let event = event_receiver.await.expect("Alice should have sent the event by now");
1479        let room_key = room_key.await;
1480
1481        (event, room_key)
1482    }
1483
1484    #[async_test]
1485    async fn test_redecryptor() {
1486        let room_id = room_id!("!test:localhost");
1487
1488        let event_factory = EventFactory::new().room(room_id);
1489        let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, true, false).await;
1490
1491        let (event, room_key) =
1492            prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1493
1494        // Let's now see what Bob's event cache does.
1495
1496        let event_cache = bob.event_cache();
1497        let (room_cache, _) = event_cache
1498            .room(room_id)
1499            .await
1500            .expect("We should be able to get to the event cache for a specific room");
1501
1502        let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1503        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1504
1505        // We regenerate the Olm machine to check if the room key stream is recreated to
1506        // correctly.
1507        bob.inner
1508            .base_client
1509            .regenerate_olm(None)
1510            .await
1511            .expect("We should be able to regenerate the Olm machine");
1512
1513        // Let us forward the event to Bob.
1514        matrix_mock_server
1515            .mock_sync()
1516            .ok_and_run(&bob, |builder| {
1517                builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1518            })
1519            .await;
1520
1521        // Alright, Bob has received an update from the cache.
1522
1523        assert_let_timeout!(
1524            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1525                subscriber.recv()
1526        );
1527
1528        // There should be a single new event, and it should be a UTD as we did not
1529        // receive the room key yet.
1530        assert_eq!(diffs.len(), 1);
1531        assert_matches!(&diffs[0], VectorDiff::Append { values });
1532        assert_eq!(values.len(), 1);
1533        assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1534
1535        assert_let_timeout!(
1536            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1537        );
1538        assert_eq!(expected_room_id, room_id);
1539        assert!(generic_stream.is_empty());
1540
1541        // Now we send the room key to Bob.
1542        matrix_mock_server
1543            .mock_sync()
1544            .ok_and_run(&bob, |builder| {
1545                builder.add_to_device_event(
1546                    room_key
1547                        .deserialize_as()
1548                        .expect("We should be able to deserialize the room key"),
1549                );
1550            })
1551            .await;
1552
1553        // Bob should receive a new update from the cache.
1554        assert_let_timeout!(
1555            Duration::from_secs(1),
1556            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1557                subscriber.recv()
1558        );
1559
1560        // It should replace the UTD with a decrypted event.
1561        assert_eq!(diffs.len(), 1);
1562        assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1563        assert_eq!(*index, 0);
1564        assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1565
1566        assert_let_timeout!(
1567            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1568        );
1569        assert_eq!(expected_room_id, room_id);
1570        assert!(generic_stream.is_empty());
1571    }
1572
1573    #[async_test]
1574    async fn test_redecryptor_updating_encryption_info() {
1575        let bob_span = tracing::info_span!("bob");
1576
1577        let room_id = room_id!("!test:localhost");
1578
1579        let event_factory = EventFactory::new().room(room_id);
1580        let (alice, bob, matrix_mock_server, _) = set_up_clients(room_id, false, false).await;
1581
1582        let (event, room_key) =
1583            prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1584
1585        // Let's now see what Bob's event cache does.
1586
1587        let event_cache = bob.event_cache();
1588        let (room_cache, _) = event_cache
1589            .room(room_id)
1590            .instrument(bob_span.clone())
1591            .await
1592            .expect("We should be able to get to the event cache for a specific room");
1593
1594        let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1595        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1596
1597        // Let us forward the event to Bob.
1598        matrix_mock_server
1599            .mock_sync()
1600            .ok_and_run(&bob, |builder| {
1601                builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1602            })
1603            .instrument(bob_span.clone())
1604            .await;
1605
1606        // Alright, Bob has received an update from the cache.
1607
1608        assert_let_timeout!(
1609            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1610                subscriber.recv()
1611        );
1612
1613        // There should be a single new event, and it should be a UTD as we did not
1614        // receive the room key yet.
1615        assert_eq!(diffs.len(), 1);
1616        assert_matches!(&diffs[0], VectorDiff::Append { values });
1617        assert_eq!(values.len(), 1);
1618        assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1619
1620        assert_let_timeout!(
1621            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1622        );
1623        assert_eq!(expected_room_id, room_id);
1624        assert!(generic_stream.is_empty());
1625
1626        // Now we send the room key to Bob.
1627        matrix_mock_server
1628            .mock_sync()
1629            .ok_and_run(&bob, |builder| {
1630                builder.add_to_device_event(
1631                    room_key
1632                        .deserialize_as()
1633                        .expect("We should be able to deserialize the room key"),
1634                );
1635            })
1636            .instrument(bob_span.clone())
1637            .await;
1638
1639        // Bob should receive a new update from the cache.
1640        assert_let_timeout!(
1641            Duration::from_secs(1),
1642            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1643                subscriber.recv()
1644        );
1645
1646        // It should replace the UTD with a decrypted event.
1647        assert_eq!(diffs.len(), 1);
1648        assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1649        assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1650
1651        let encryption_info = value.encryption_info().unwrap();
1652        assert_matches!(&encryption_info.verification_state, VerificationState::Unverified(_));
1653
1654        assert_let_timeout!(
1655            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1656        );
1657        assert_eq!(expected_room_id, room_id);
1658        assert!(generic_stream.is_empty());
1659
1660        let session_id = encryption_info.session_id().unwrap().to_owned();
1661        let alice_user_id = alice.user_id().unwrap();
1662
1663        // Alice now creates the identity.
1664        alice
1665            .encryption()
1666            .bootstrap_cross_signing(None)
1667            .await
1668            .expect("Alice should be able to create the cross-signing keys");
1669
1670        bob.update_tracked_users_for_testing([alice_user_id]).instrument(bob_span.clone()).await;
1671        matrix_mock_server
1672            .mock_sync()
1673            .ok_and_run(&bob, |builder| {
1674                builder.add_change_device(alice_user_id);
1675            })
1676            .instrument(bob_span.clone())
1677            .await;
1678
1679        bob.event_cache().request_decryption(DecryptionRetryRequest {
1680            room_id: room_id.into(),
1681            utd_session_ids: BTreeSet::new(),
1682            refresh_info_session_ids: BTreeSet::from([session_id]),
1683        });
1684
1685        // Bob should again receive a new update from the cache, this time updating the
1686        // encryption info.
1687        assert_let_timeout!(
1688            Duration::from_secs(1),
1689            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1690                subscriber.recv()
1691        );
1692
1693        assert_eq!(diffs.len(), 1);
1694        assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value });
1695        assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1696        let encryption_info = value.encryption_info().unwrap();
1697
1698        assert_matches!(
1699            &encryption_info.verification_state,
1700            VerificationState::Unverified(_),
1701            "The event should now know about the identity but still be unverified"
1702        );
1703
1704        assert_let_timeout!(
1705            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1706        );
1707        assert_eq!(expected_room_id, room_id);
1708        assert!(generic_stream.is_empty());
1709    }
1710
1711    #[async_test]
1712    async fn test_event_is_redecrypted_even_if_key_arrives_while_event_processing() {
1713        let room_id = room_id!("!test:localhost");
1714
1715        let event_factory = EventFactory::new().room(room_id);
1716        let (alice, bob, matrix_mock_server, delayed_store) =
1717            set_up_clients(room_id, true, true).await;
1718
1719        let delayed_store = delayed_store.unwrap();
1720
1721        let (event, room_key) =
1722            prepare_room(&matrix_mock_server, &event_factory, &alice, &bob, room_id).await;
1723
1724        let event_cache = bob.event_cache();
1725
1726        // Let's now see what Bob's event cache does.
1727        let (room_cache, _) = event_cache
1728            .room(room_id)
1729            .await
1730            .expect("We should be able to get to the event cache for a specific room");
1731
1732        let (_, mut subscriber) = room_cache.subscribe().await.unwrap();
1733        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
1734
1735        // Let us forward the event to Bob.
1736        matrix_mock_server
1737            .mock_sync()
1738            .ok_and_run(&bob, |builder| {
1739                builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event));
1740            })
1741            .await;
1742
1743        // Now we send the room key to Bob.
1744        matrix_mock_server
1745            .mock_sync()
1746            .ok_and_run(&bob, |builder| {
1747                builder.add_to_device_event(
1748                    room_key
1749                        .deserialize_as()
1750                        .expect("We should be able to deserialize the room key"),
1751                );
1752            })
1753            .await;
1754
1755        info!("Stopping the delay");
1756        delayed_store.stop_delaying().await;
1757
1758        // The first decryption attempt has failed because the first sync (the
1759        // one with the event) did not contain the room key. The decryptor has
1760        // later received the room key.
1761
1762        // Alright, Bob has received an update from the cache.
1763        assert_let_timeout!(
1764            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1765                subscriber.recv()
1766        );
1767
1768        // There should be a single new event, and it should be a UTD as we did not
1769        // receive the room key yet.
1770        assert_eq!(diffs.len(), 1);
1771        assert_matches!(&diffs[0], VectorDiff::Append { values });
1772        assert_eq!(values.len(), 1);
1773        assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. });
1774
1775        // And the companion generic update.
1776        assert_let_timeout!(
1777            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1778        );
1779        assert_eq!(expected_room_id, room_id);
1780
1781        // Bob should receive a new update from the cache.
1782        assert_let_timeout!(
1783            Duration::from_secs(1),
1784            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
1785                subscriber.recv()
1786        );
1787
1788        // It should replace the UTD with a decrypted event.
1789        assert_eq!(diffs.len(), 1);
1790        assert_matches!(&diffs[0], VectorDiff::Set { index, value });
1791        assert_eq!(*index, 0);
1792        assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. });
1793
1794        // And the companion generic update.
1795        assert_let_timeout!(
1796            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
1797        );
1798        assert_eq!(expected_room_id, room_id);
1799        assert!(generic_stream.is_empty());
1800    }
1801}