matrix_sdk_crypto/gossiping/
machine.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// TODO
16//
17// handle the case where we can't create a session with a device. clearing our
18// stale key share requests that we'll never be able to handle.
19//
20// If we don't trust the device store an object that remembers the request and
21// let the users introspect that object.
22
23use std::{
24    collections::{btree_map::Entry, BTreeMap, BTreeSet},
25    mem,
26    sync::{
27        atomic::{AtomicBool, Ordering},
28        Arc,
29    },
30};
31
32use matrix_sdk_common::locks::RwLock as StdRwLock;
33use ruma::{
34    api::client::keys::claim_keys::v3::Request as KeysClaimRequest,
35    events::secret::request::{
36        RequestAction, SecretName, ToDeviceSecretRequestEvent as SecretRequestEvent,
37    },
38    DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedTransactionId, OwnedUserId, RoomId,
39    TransactionId, UserId,
40};
41use tracing::{debug, field::debug, info, instrument, trace, warn, Span};
42use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey};
43
44use super::{GossipRequest, GossippedSecret, RequestEvent, RequestInfo, SecretInfo, WaitQueue};
45use crate::{
46    error::{EventError, OlmError, OlmResult},
47    identities::IdentityManager,
48    olm::{InboundGroupSession, Session},
49    session_manager::GroupSessionCache,
50    store::{Changes, CryptoStoreError, SecretImportError, Store, StoreCache},
51    types::{
52        events::{
53            forwarded_room_key::ForwardedRoomKeyContent,
54            olm_v1::{DecryptedForwardedRoomKeyEvent, DecryptedSecretSendEvent},
55            room::encrypted::EncryptedEvent,
56            room_key_request::RoomKeyRequestEvent,
57            secret_send::SecretSendContent,
58            EventType,
59        },
60        requests::{OutgoingRequest, ToDeviceRequest},
61    },
62    Device, MegolmError,
63};
64
65#[derive(Clone, Debug)]
66pub(crate) struct GossipMachine {
67    inner: Arc<GossipMachineInner>,
68}
69
70#[derive(Debug)]
71pub(crate) struct GossipMachineInner {
72    store: Store,
73    #[cfg(feature = "automatic-room-key-forwarding")]
74    outbound_group_sessions: GroupSessionCache,
75    outgoing_requests: StdRwLock<BTreeMap<OwnedTransactionId, OutgoingRequest>>,
76    incoming_key_requests: StdRwLock<BTreeMap<RequestInfo, RequestEvent>>,
77    wait_queue: WaitQueue,
78    users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
79
80    /// Whether we should respond to incoming `m.room_key_request` messages.
81    room_key_forwarding_enabled: AtomicBool,
82
83    /// Whether we should send out `m.room_key_request` messages.
84    room_key_requests_enabled: AtomicBool,
85
86    identity_manager: IdentityManager,
87}
88
89impl GossipMachine {
90    pub fn new(
91        store: Store,
92        identity_manager: IdentityManager,
93        #[allow(unused)] outbound_group_sessions: GroupSessionCache,
94        users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
95    ) -> Self {
96        let room_key_forwarding_enabled =
97            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
98
99        let room_key_requests_enabled =
100            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
101
102        Self {
103            inner: Arc::new(GossipMachineInner {
104                store,
105                #[cfg(feature = "automatic-room-key-forwarding")]
106                outbound_group_sessions,
107                outgoing_requests: Default::default(),
108                incoming_key_requests: Default::default(),
109                wait_queue: WaitQueue::new(),
110                users_for_key_claim,
111                room_key_forwarding_enabled,
112                room_key_requests_enabled,
113                identity_manager,
114            }),
115        }
116    }
117
118    pub(crate) fn identity_manager(&self) -> &IdentityManager {
119        &self.inner.identity_manager
120    }
121
122    #[cfg(feature = "automatic-room-key-forwarding")]
123    pub fn set_room_key_forwarding_enabled(&self, enabled: bool) {
124        self.inner.room_key_forwarding_enabled.store(enabled, Ordering::SeqCst)
125    }
126
127    pub fn is_room_key_forwarding_enabled(&self) -> bool {
128        self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst)
129    }
130
131    /// Configure whether we should send outgoing `m.room_key_request`s on
132    /// decryption failure.
133    #[cfg(feature = "automatic-room-key-forwarding")]
134    pub fn set_room_key_requests_enabled(&self, enabled: bool) {
135        self.inner.room_key_requests_enabled.store(enabled, Ordering::SeqCst)
136    }
137
138    /// Query whether we should send outgoing `m.room_key_request`s on
139    /// decryption failure.
140    pub fn are_room_key_requests_enabled(&self) -> bool {
141        self.inner.room_key_requests_enabled.load(Ordering::SeqCst)
142    }
143
144    /// Load stored outgoing requests that were not yet sent out.
145    async fn load_outgoing_requests(&self) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
146        Ok(self
147            .inner
148            .store
149            .get_unsent_secret_requests()
150            .await?
151            .into_iter()
152            .filter(|i| !i.sent_out)
153            .map(|info| info.to_request(self.device_id()))
154            .collect())
155    }
156
157    /// Our own user id.
158    pub fn user_id(&self) -> &UserId {
159        &self.inner.store.static_account().user_id
160    }
161
162    /// Our own device ID.
163    pub fn device_id(&self) -> &DeviceId {
164        &self.inner.store.static_account().device_id
165    }
166
167    pub async fn outgoing_to_device_requests(
168        &self,
169    ) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
170        let mut key_requests = self.load_outgoing_requests().await?;
171        let key_forwards: Vec<OutgoingRequest> =
172            self.inner.outgoing_requests.read().values().cloned().collect();
173        key_requests.extend(key_forwards);
174
175        let users_for_key_claim: BTreeMap<_, _> = self
176            .inner
177            .users_for_key_claim
178            .read()
179            .iter()
180            .map(|(key, value)| {
181                let device_map = value
182                    .iter()
183                    .map(|d| (d.to_owned(), OneTimeKeyAlgorithm::SignedCurve25519))
184                    .collect();
185
186                (key.to_owned(), device_map)
187            })
188            .collect();
189
190        if !users_for_key_claim.is_empty() {
191            let key_claim_request = KeysClaimRequest::new(users_for_key_claim);
192            key_requests.push(OutgoingRequest {
193                request_id: TransactionId::new(),
194                request: Arc::new(key_claim_request.into()),
195            });
196        }
197
198        Ok(key_requests)
199    }
200
201    /// Receive a room key request event.
202    pub fn receive_incoming_key_request(&self, event: &RoomKeyRequestEvent) {
203        self.receive_event(event.clone().into())
204    }
205
206    fn receive_event(&self, event: RequestEvent) {
207        // Some servers might send to-device events to ourselves if we send one
208        // out using a wildcard instead of a specific device as a recipient.
209        //
210        // Check if we're the sender of this request event and ignore it if
211        // so.
212        if event.sender() == self.user_id() && event.requesting_device_id() == self.device_id() {
213            trace!("Received a secret request event from ourselves, ignoring")
214        } else {
215            let request_info = event.to_request_info();
216            self.inner.incoming_key_requests.write().insert(request_info, event);
217        }
218    }
219
220    pub fn receive_incoming_secret_request(&self, event: &SecretRequestEvent) {
221        self.receive_event(event.clone().into())
222    }
223
224    /// Handle all the incoming key requests that are queued up and empty our
225    /// key request queue.
226    pub async fn collect_incoming_key_requests(
227        &self,
228        cache: &StoreCache,
229    ) -> OlmResult<Vec<Session>> {
230        let mut changed_sessions = Vec::new();
231
232        let incoming_key_requests = mem::take(&mut *self.inner.incoming_key_requests.write());
233
234        for event in incoming_key_requests.values() {
235            if let Some(s) = match event {
236                #[cfg(feature = "automatic-room-key-forwarding")]
237                RequestEvent::KeyShare(e) => Box::pin(self.handle_key_request(cache, e)).await?,
238                RequestEvent::Secret(e) => Box::pin(self.handle_secret_request(cache, e)).await?,
239                #[cfg(not(feature = "automatic-room-key-forwarding"))]
240                _ => None,
241            } {
242                changed_sessions.push(s);
243            }
244        }
245
246        Ok(changed_sessions)
247    }
248
249    /// Store the key share request for later, once we get an Olm session with
250    /// the given device [`retry_keyshare`](#method.retry_keyshare) should be
251    /// called.
252    fn handle_key_share_without_session(&self, device: Device, event: RequestEvent) {
253        self.inner
254            .users_for_key_claim
255            .write()
256            .entry(device.user_id().to_owned())
257            .or_default()
258            .insert(device.device_id().into());
259        self.inner.wait_queue.insert(&device, event);
260    }
261
262    /// Retry keyshares for a device that previously didn't have an Olm session
263    /// with us.
264    ///
265    /// This should be only called if the given user/device got a new Olm
266    /// session.
267    ///
268    /// # Arguments
269    ///
270    /// * `user_id` - The user id of the device that we created the Olm session
271    ///   with.
272    ///
273    /// * `device_id` - The device ID of the device that got the Olm session.
274    pub fn retry_keyshare(&self, user_id: &UserId, device_id: &DeviceId) {
275        if let Entry::Occupied(mut e) =
276            self.inner.users_for_key_claim.write().entry(user_id.to_owned())
277        {
278            e.get_mut().remove(device_id);
279
280            if e.get().is_empty() {
281                e.remove();
282            }
283        }
284
285        let mut incoming_key_requests = self.inner.incoming_key_requests.write();
286        for (key, event) in self.inner.wait_queue.remove(user_id, device_id) {
287            incoming_key_requests.entry(key).or_insert(event);
288        }
289    }
290
291    async fn handle_secret_request(
292        &self,
293        cache: &StoreCache,
294        event: &SecretRequestEvent,
295    ) -> OlmResult<Option<Session>> {
296        let secret_name = match &event.content.action {
297            RequestAction::Request(s) => s,
298            // We ignore cancellations here since there's nothing to serve.
299            RequestAction::RequestCancellation => return Ok(None),
300            action => {
301                warn!(?action, "Unknown secret request action");
302                return Ok(None);
303            }
304        };
305
306        let content = if let Some(secret) = self.inner.store.export_secret(secret_name).await? {
307            SecretSendContent::new(event.content.request_id.to_owned(), secret)
308        } else {
309            info!(?secret_name, "Can't serve a secret request, secret isn't found");
310            return Ok(None);
311        };
312
313        let device =
314            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
315
316        if let Some(device) = device {
317            if device.user_id() == self.user_id() {
318                if device.is_verified() {
319                    info!(
320                        user_id = ?device.user_id(),
321                        device_id = ?device.device_id(),
322                        ?secret_name,
323                        "Sharing a secret with a device",
324                    );
325
326                    match self.share_secret(&device, content).await {
327                        Ok(s) => Ok(Some(s)),
328                        Err(OlmError::MissingSession) => {
329                            info!(
330                                user_id = ?device.user_id(),
331                                device_id = ?device.device_id(),
332                                ?secret_name,
333                                "Secret request is missing an Olm session, \
334                                putting the request in the wait queue",
335                            );
336                            self.handle_key_share_without_session(device, event.clone().into());
337
338                            Ok(None)
339                        }
340                        Err(e) => Err(e),
341                    }
342                } else {
343                    info!(
344                        user_id = ?device.user_id(),
345                        device_id = ?device.device_id(),
346                        ?secret_name,
347                        "Received a secret request that we won't serve, the device isn't trusted",
348                    );
349
350                    Ok(None)
351                }
352            } else {
353                info!(
354                    user_id = ?device.user_id(),
355                    device_id = ?device.device_id(),
356                    ?secret_name,
357                    "Received a secret request that we won't serve, the device doesn't belong to us",
358                );
359
360                Ok(None)
361            }
362        } else {
363            warn!(
364                user_id = ?event.sender,
365                device_id = ?event.content.requesting_device_id,
366                ?secret_name,
367                "Received a secret request from an unknown device",
368            );
369            self.inner
370                .identity_manager
371                .key_query_manager
372                .synced(cache)
373                .await?
374                .mark_user_as_changed(&event.sender)
375                .await?;
376
377            Ok(None)
378        }
379    }
380
381    /// Try to encrypt the given `InboundGroupSession` for the given `Device` as
382    /// a forwarded room key.
383    ///
384    /// This method might fail if we do not share an 1-to-1 Olm session with the
385    /// given `Device`, in that case we're going to queue up an
386    /// `/keys/claim` request to be sent out and retry once the 1-to-1 Olm
387    /// session has been established.
388    #[cfg(feature = "automatic-room-key-forwarding")]
389    async fn try_to_forward_room_key(
390        &self,
391        event: &RoomKeyRequestEvent,
392        device: Device,
393        session: &InboundGroupSession,
394        message_index: Option<u32>,
395    ) -> OlmResult<Option<Session>> {
396        info!(?message_index, "Serving a room key request",);
397
398        match self.forward_room_key(session, &device, message_index).await {
399            Ok(s) => Ok(Some(s)),
400            Err(OlmError::MissingSession) => {
401                info!(
402                    "Key request is missing an Olm session, putting the request in the wait queue",
403                );
404                self.handle_key_share_without_session(device, event.to_owned().into());
405
406                Ok(None)
407            }
408            Err(OlmError::SessionExport(e)) => {
409                warn!(
410                    "Can't serve a room key request, the session \
411                     can't be exported into a forwarded room key: {e:?}",
412                );
413                Ok(None)
414            }
415            Err(e) => Err(e),
416        }
417    }
418
419    /// Answer a room key request after we found the matching
420    /// `InboundGroupSession`.
421    #[cfg(feature = "automatic-room-key-forwarding")]
422    async fn answer_room_key_request(
423        &self,
424        cache: &StoreCache,
425        event: &RoomKeyRequestEvent,
426        session: &InboundGroupSession,
427    ) -> OlmResult<Option<Session>> {
428        use super::KeyForwardDecision;
429
430        let device =
431            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
432
433        let Some(device) = device else {
434            warn!("Received a key request from an unknown device");
435            self.identity_manager()
436                .key_query_manager
437                .synced(cache)
438                .await?
439                .mark_user_as_changed(&event.sender)
440                .await?;
441
442            return Ok(None);
443        };
444
445        match self.should_share_key(&device, session).await {
446            Ok(message_index) => {
447                self.try_to_forward_room_key(event, device, session, message_index).await
448            }
449            Err(e) => {
450                if let KeyForwardDecision::ChangedSenderKey = e {
451                    warn!(
452                        "Received a key request from a device that changed \
453                         their Curve25519 sender key"
454                    );
455                } else {
456                    debug!(
457                        reason = ?e,
458                        "Received a key request that we won't serve",
459                    );
460                }
461
462                Ok(None)
463            }
464        }
465    }
466
467    #[cfg(feature = "automatic-room-key-forwarding")]
468    #[tracing::instrument(
469        skip_all,
470        fields(
471            user_id = ?event.sender,
472            device_id = ?event.content.requesting_device_id,
473            ?room_id,
474            session_id
475        )
476    )]
477    async fn handle_supported_key_request(
478        &self,
479        cache: &StoreCache,
480        event: &RoomKeyRequestEvent,
481        room_id: &RoomId,
482        session_id: &str,
483    ) -> OlmResult<Option<Session>> {
484        let session = self.inner.store.get_inbound_group_session(room_id, session_id).await?;
485
486        if let Some(s) = session {
487            self.answer_room_key_request(cache, event, &s).await
488        } else {
489            debug!("Received a room key request for an unknown inbound group session",);
490
491            Ok(None)
492        }
493    }
494
495    /// Handle a single incoming key request.
496    #[cfg(feature = "automatic-room-key-forwarding")]
497    async fn handle_key_request(
498        &self,
499        cache: &StoreCache,
500        event: &RoomKeyRequestEvent,
501    ) -> OlmResult<Option<Session>> {
502        use crate::types::events::room_key_request::{Action, RequestedKeyInfo};
503
504        if self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst) {
505            match &event.content.action {
506                Action::Request(info) => match info {
507                    RequestedKeyInfo::MegolmV1AesSha2(i) => {
508                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
509                            .await
510                    }
511                    #[cfg(feature = "experimental-algorithms")]
512                    RequestedKeyInfo::MegolmV2AesSha2(i) => {
513                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
514                            .await
515                    }
516                    RequestedKeyInfo::Unknown(i) => {
517                        debug!(
518                            sender = ?event.sender,
519                            algorithm = ?i.algorithm,
520                            "Received a room key request for a unsupported algorithm"
521                        );
522                        Ok(None)
523                    }
524                },
525                // We ignore cancellations here since there's nothing to serve.
526                Action::Cancellation => Ok(None),
527            }
528        } else {
529            debug!(
530                sender = ?event.sender,
531                "Received a room key request, but room key forwarding has been turned off"
532            );
533            Ok(None)
534        }
535    }
536
537    async fn share_secret(
538        &self,
539        device: &Device,
540        content: SecretSendContent,
541    ) -> OlmResult<Session> {
542        let event_type = content.event_type().to_owned();
543        let (used_session, content) = device.encrypt(&event_type, content).await?;
544
545        let encrypted_event_type = content.event_type().to_owned();
546
547        let request = ToDeviceRequest::new(
548            device.user_id(),
549            device.device_id().to_owned(),
550            &encrypted_event_type,
551            content.cast(),
552        );
553
554        let request = OutgoingRequest {
555            request_id: request.txn_id.clone(),
556            request: Arc::new(request.into()),
557        };
558        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
559
560        Ok(used_session)
561    }
562
563    #[cfg(feature = "automatic-room-key-forwarding")]
564    async fn forward_room_key(
565        &self,
566        session: &InboundGroupSession,
567        device: &Device,
568        message_index: Option<u32>,
569    ) -> OlmResult<Session> {
570        let (used_session, content) =
571            device.encrypt_room_key_for_forwarding(session.clone(), message_index).await?;
572
573        let event_type = content.event_type().to_owned();
574
575        let request = ToDeviceRequest::new(
576            device.user_id(),
577            device.device_id().to_owned(),
578            &event_type,
579            content.cast(),
580        );
581
582        let request = OutgoingRequest {
583            request_id: request.txn_id.clone(),
584            request: Arc::new(request.into()),
585        };
586        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
587
588        Ok(used_session)
589    }
590
591    /// Check if it's ok to share a session with the given device.
592    ///
593    /// The logic for this is currently as follows:
594    ///
595    /// * Share the session in full, starting from the earliest known index, if
596    ///   the requesting device is our own, trusted (verified) device.
597    ///
598    /// * For other requesting devices, share only a limited session and only if
599    ///   we originally shared with that device because it was present when the
600    ///   message was initially sent. By limited, we mean that the session will
601    ///   not be shared in full, but only from the message index at that moment.
602    ///   Since this information is recorded in the outbound session, we need to
603    ///   have it for this to work.
604    ///
605    /// * In all other cases, refuse to share the session.
606    ///
607    /// # Arguments
608    ///
609    /// * `device` - The device that is requesting a session from us.
610    ///
611    /// * `session` - The session that was requested to be shared.
612    ///
613    /// # Return value
614    ///
615    /// A `Result` representing whether we should share the session:
616    ///
617    /// - `Ok(None)`: Should share the entire session, starting with the
618    ///   earliest known index.
619    /// - `Ok(Some(i))`: Should share the session, but only starting from index
620    ///   i.
621    /// - `Err(x)`: Should *refuse* to share the session. `x` is the reason for
622    ///   the refusal.
623    #[cfg(feature = "automatic-room-key-forwarding")]
624    async fn should_share_key(
625        &self,
626        device: &Device,
627        session: &InboundGroupSession,
628    ) -> Result<Option<u32>, super::KeyForwardDecision> {
629        use super::KeyForwardDecision;
630        use crate::olm::ShareState;
631
632        let outbound_session = self
633            .inner
634            .outbound_group_sessions
635            .get_or_load(session.room_id())
636            .await
637            .filter(|outgoing_session| outgoing_session.session_id() == session.session_id());
638
639        // If this is our own, verified device, we share the entire session from the
640        // earliest known index.
641        if device.user_id() == self.user_id() && device.is_verified() {
642            Ok(None)
643        // Otherwise, if the records show we previously shared with this device,
644        // we'll reshare the session from the index we previously shared
645        // at. For this, we need an outbound session because this
646        // information is recorded there.
647        } else if let Some(outbound) = outbound_session {
648            match outbound.is_shared_with(&device.inner) {
649                ShareState::Shared { message_index, olm_wedging_index: _ } => {
650                    Ok(Some(message_index))
651                }
652                ShareState::SharedButChangedSenderKey => Err(KeyForwardDecision::ChangedSenderKey),
653                ShareState::NotShared => Err(KeyForwardDecision::OutboundSessionNotShared),
654            }
655        // Otherwise, there's not enough info to decide if we can safely share
656        // the session.
657        } else if device.user_id() == self.user_id() {
658            Err(KeyForwardDecision::UntrustedDevice)
659        } else {
660            Err(KeyForwardDecision::MissingOutboundSession)
661        }
662    }
663
664    /// Check if it's ok, or rather if it makes sense to automatically request
665    /// a key from our other devices.
666    ///
667    /// # Arguments
668    ///
669    /// * `key_info` - The info of our key request containing information about
670    ///   the key we wish to request.
671    #[cfg(feature = "automatic-room-key-forwarding")]
672    async fn should_request_key(&self, key_info: &SecretInfo) -> Result<bool, CryptoStoreError> {
673        if self.inner.room_key_requests_enabled.load(Ordering::SeqCst) {
674            let request = self.inner.store.get_secret_request_by_info(key_info).await?;
675
676            // Don't send out duplicate requests, users can re-request them if they
677            // think a second request might succeed.
678            if request.is_none() {
679                let devices = self.inner.store.get_user_devices(self.user_id()).await?;
680
681                // Devices will only respond to key requests if the devices are
682                // verified, if the device isn't verified by us it's unlikely that
683                // we're verified by them either. Don't request keys if there isn't
684                // at least one verified device.
685                Ok(devices.is_any_verified())
686            } else {
687                Ok(false)
688            }
689        } else {
690            Ok(false)
691        }
692    }
693
694    /// Create a new outgoing key request for the key with the given session id.
695    ///
696    /// This will queue up a new to-device request and store the key info so
697    /// once we receive a forwarded room key we can check that it matches the
698    /// key we requested.
699    ///
700    /// This method will return a cancel request and a new key request if the
701    /// key was already requested, otherwise it will return just the key
702    /// request.
703    ///
704    /// # Arguments
705    ///
706    /// * `room_id` - The id of the room where the key is used in.
707    ///
708    /// * `event` - The event for which we would like to request the room key.
709    pub async fn request_key(
710        &self,
711        room_id: &RoomId,
712        event: &EncryptedEvent,
713    ) -> Result<(Option<OutgoingRequest>, OutgoingRequest), MegolmError> {
714        let secret_info =
715            event.room_key_info(room_id).ok_or(EventError::UnsupportedAlgorithm)?.into();
716
717        let request = self.inner.store.get_secret_request_by_info(&secret_info).await?;
718
719        if let Some(request) = request {
720            let cancel = request.to_cancellation(self.device_id());
721            let request = request.to_request(self.device_id());
722
723            Ok((Some(cancel), request))
724        } else {
725            let request = self.request_key_helper(secret_info).await?;
726
727            Ok((None, request))
728        }
729    }
730
731    /// Create outgoing secret requests for the given
732    pub fn request_missing_secrets(
733        own_user_id: &UserId,
734        secret_names: Vec<SecretName>,
735    ) -> Vec<GossipRequest> {
736        if !secret_names.is_empty() {
737            info!(?secret_names, "Creating new outgoing secret requests");
738
739            secret_names
740                .into_iter()
741                .map(|n| GossipRequest::from_secret_name(own_user_id.to_owned(), n))
742                .collect()
743        } else {
744            trace!("No secrets are missing from our store, not requesting them");
745            vec![]
746        }
747    }
748
749    async fn request_key_helper(
750        &self,
751        key_info: SecretInfo,
752    ) -> Result<OutgoingRequest, CryptoStoreError> {
753        let request = GossipRequest {
754            request_recipient: self.user_id().to_owned(),
755            request_id: TransactionId::new(),
756            info: key_info,
757            sent_out: false,
758        };
759
760        let outgoing_request = request.to_request(self.device_id());
761        self.save_outgoing_key_info(request).await?;
762
763        Ok(outgoing_request)
764    }
765
766    /// Create a new outgoing key request for the key with the given session id.
767    ///
768    /// This will queue up a new to-device request and store the key info so
769    /// once we receive a forwarded room key we can check that it matches the
770    /// key we requested.
771    ///
772    /// This does nothing if a request for this key has already been sent out.
773    ///
774    /// # Arguments
775    /// * `room_id` - The id of the room where the key is used in.
776    ///
777    /// * `event` - The event for which we would like to request the room key.
778    #[cfg(feature = "automatic-room-key-forwarding")]
779    pub async fn create_outgoing_key_request(
780        &self,
781        room_id: &RoomId,
782        event: &EncryptedEvent,
783    ) -> Result<bool, CryptoStoreError> {
784        if let Some(info) = event.room_key_info(room_id).map(|i| i.into()) {
785            if self.should_request_key(&info).await? {
786                // Size of the request_key_helper future should not impact this
787                // async fn since it is likely enough that this branch won't be
788                // entered.
789                Box::pin(self.request_key_helper(info)).await?;
790                return Ok(true);
791            }
792        }
793
794        Ok(false)
795    }
796
797    /// Save an outgoing key info.
798    async fn save_outgoing_key_info(&self, info: GossipRequest) -> Result<(), CryptoStoreError> {
799        let mut changes = Changes::default();
800        changes.key_requests.push(info);
801        self.inner.store.save_changes(changes).await?;
802
803        Ok(())
804    }
805
806    /// Delete the given outgoing key info.
807    async fn delete_key_info(&self, info: &GossipRequest) -> Result<(), CryptoStoreError> {
808        self.inner.store.delete_outgoing_secret_requests(&info.request_id).await
809    }
810
811    /// Mark the outgoing request as sent.
812    pub async fn mark_outgoing_request_as_sent(
813        &self,
814        id: &TransactionId,
815    ) -> Result<(), CryptoStoreError> {
816        let info = self.inner.store.get_outgoing_secret_requests(id).await?;
817
818        if let Some(mut info) = info {
819            trace!(
820                recipient = ?info.request_recipient,
821                request_type = info.request_type(),
822                request_id = ?info.request_id,
823                "Marking outgoing secret request as sent"
824            );
825            info.sent_out = true;
826            self.save_outgoing_key_info(info).await?;
827        }
828
829        self.inner.outgoing_requests.write().remove(id);
830
831        Ok(())
832    }
833
834    /// Mark the given outgoing key info as done.
835    ///
836    /// This will queue up a request cancellation.
837    async fn mark_as_done(&self, key_info: &GossipRequest) -> Result<(), CryptoStoreError> {
838        trace!(
839            recipient = ?key_info.request_recipient,
840            request_type = key_info.request_type(),
841            request_id = ?key_info.request_id,
842            "Successfully received a secret, removing the request"
843        );
844
845        self.inner.outgoing_requests.write().remove(&key_info.request_id);
846        // TODO return the key info instead of deleting it so the sync handler
847        // can delete it in one transaction.
848        self.delete_key_info(key_info).await?;
849
850        let request = key_info.to_cancellation(self.device_id());
851        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
852
853        Ok(())
854    }
855
856    async fn accept_secret(
857        &self,
858        secret: GossippedSecret,
859        changes: &mut Changes,
860    ) -> Result<(), CryptoStoreError> {
861        if secret.secret_name != SecretName::RecoveryKey {
862            match self.inner.store.import_secret(&secret).await {
863                Ok(_) => self.mark_as_done(&secret.gossip_request).await?,
864                // If this is a store error propagate it up the call stack.
865                Err(SecretImportError::Store(e)) => return Err(e),
866                // Otherwise warn that there was something wrong with the
867                // secret.
868                Err(e) => {
869                    warn!(
870                        secret_name = ?secret.secret_name,
871                        error = ?e,
872                        "Error while importing a secret"
873                    );
874                }
875            }
876        } else {
877            // We would need to fire out a request to figure out if this backup decryption
878            // key is the one that is used for the current backup and if the
879            // backup is trusted.
880            //
881            // So we put the secret into our inbox. Later users can inspect the contents of
882            // the inbox and decide if they want to activate the backup.
883            info!("Received a backup decryption key, storing it into the secret inbox.");
884            changes.secrets.push(secret);
885        }
886
887        Ok(())
888    }
889
890    async fn receive_secret(
891        &self,
892        cache: &StoreCache,
893        sender_key: Curve25519PublicKey,
894        secret: GossippedSecret,
895        changes: &mut Changes,
896    ) -> Result<(), CryptoStoreError> {
897        debug!("Received a m.secret.send event with a matching request");
898
899        if let Some(device) =
900            self.inner.store.get_device_from_curve_key(&secret.event.sender, sender_key).await?
901        {
902            // Only accept secrets from one of our own trusted devices.
903            if device.user_id() == self.user_id() && device.is_verified() {
904                self.accept_secret(secret, changes).await?;
905            } else {
906                warn!("Received a m.secret.send event from another user or from unverified device");
907            }
908        } else {
909            warn!("Received a m.secret.send event from an unknown device");
910
911            self.identity_manager()
912                .key_query_manager
913                .synced(cache)
914                .await?
915                .mark_user_as_changed(&secret.event.sender)
916                .await?;
917        }
918
919        Ok(())
920    }
921
922    #[instrument(skip_all, fields(sender_key, sender = ?event.sender, request_id = ?event.content.request_id, secret_name))]
923    pub async fn receive_secret_event(
924        &self,
925        cache: &StoreCache,
926        sender_key: Curve25519PublicKey,
927        event: &DecryptedSecretSendEvent,
928        changes: &mut Changes,
929    ) -> Result<Option<SecretName>, CryptoStoreError> {
930        debug!("Received a m.secret.send event");
931
932        let request_id = &event.content.request_id;
933
934        let name = if let Some(request) =
935            self.inner.store.get_outgoing_secret_requests(request_id).await?
936        {
937            match &request.info {
938                SecretInfo::KeyRequest(_) => {
939                    warn!("Received a m.secret.send event but the request was for a room key");
940
941                    None
942                }
943                SecretInfo::SecretRequest(secret_name) => {
944                    Span::current().record("secret_name", debug(secret_name));
945
946                    let secret_name = secret_name.to_owned();
947
948                    let secret = GossippedSecret {
949                        secret_name: secret_name.to_owned(),
950                        event: event.to_owned(),
951                        gossip_request: request,
952                    };
953
954                    self.receive_secret(cache, sender_key, secret, changes).await?;
955
956                    Some(secret_name)
957                }
958            }
959        } else {
960            warn!("Received a m.secret.send event, but no matching request was found");
961            None
962        };
963
964        Ok(name)
965    }
966
967    async fn accept_forwarded_room_key(
968        &self,
969        info: &GossipRequest,
970        sender_key: Curve25519PublicKey,
971        event: &DecryptedForwardedRoomKeyEvent,
972    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
973        match InboundGroupSession::try_from(event) {
974            Ok(session) => {
975                if self.inner.store.compare_group_session(&session).await?
976                    == SessionOrdering::Better
977                {
978                    self.mark_as_done(info).await?;
979
980                    info!(
981                        ?sender_key,
982                        claimed_sender_key = ?session.sender_key(),
983                        room_id = ?session.room_id(),
984                        session_id = session.session_id(),
985                        algorithm = ?session.algorithm(),
986                        "Received a forwarded room key",
987                    );
988
989                    Ok(Some(session))
990                } else {
991                    info!(
992                        ?sender_key,
993                        claimed_sender_key = ?session.sender_key(),
994                        room_id = ?session.room_id(),
995                        session_id = session.session_id(),
996                        algorithm = ?session.algorithm(),
997                        "Received a forwarded room key but we already have a better version of it",
998                    );
999
1000                    Ok(None)
1001                }
1002            }
1003            Err(e) => {
1004                warn!(?sender_key, "Couldn't create a group session from a received room key");
1005                Err(e.into())
1006            }
1007        }
1008    }
1009
1010    async fn should_accept_forward(
1011        &self,
1012        info: &GossipRequest,
1013        sender_key: Curve25519PublicKey,
1014    ) -> Result<bool, CryptoStoreError> {
1015        let device =
1016            self.inner.store.get_device_from_curve_key(&info.request_recipient, sender_key).await?;
1017
1018        if let Some(device) = device {
1019            Ok(device.user_id() == self.user_id() && device.is_verified())
1020        } else {
1021            Ok(false)
1022        }
1023    }
1024
1025    /// Receive a forwarded room key event that was sent using any of our
1026    /// supported content types.
1027    async fn receive_supported_keys(
1028        &self,
1029        sender_key: Curve25519PublicKey,
1030        event: &DecryptedForwardedRoomKeyEvent,
1031    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1032        let Some(info) = event.room_key_info() else {
1033            warn!(
1034                sender_key = sender_key.to_base64(),
1035                algorithm = ?event.content.algorithm(),
1036                "Received a forwarded room key with an unsupported algorithm",
1037            );
1038            return Ok(None);
1039        };
1040
1041        let Some(request) =
1042            self.inner.store.get_secret_request_by_info(&info.clone().into()).await?
1043        else {
1044            warn!(
1045                sender_key = ?sender_key,
1046                room_id = ?info.room_id(),
1047                session_id = info.session_id(),
1048                sender_key = ?sender_key,
1049                algorithm = ?info.algorithm(),
1050                "Received a forwarded room key that we didn't request",
1051            );
1052            return Ok(None);
1053        };
1054
1055        if self.should_accept_forward(&request, sender_key).await? {
1056            self.accept_forwarded_room_key(&request, sender_key, event).await
1057        } else {
1058            warn!(
1059                ?sender_key,
1060                room_id = ?info.room_id(),
1061                session_id = info.session_id(),
1062                "Received a forwarded room key from an unknown device, or \
1063                 from a device that the key request recipient doesn't own",
1064            );
1065
1066            Ok(None)
1067        }
1068    }
1069
1070    /// Receive a forwarded room key event.
1071    pub async fn receive_forwarded_room_key(
1072        &self,
1073        sender_key: Curve25519PublicKey,
1074        event: &DecryptedForwardedRoomKeyEvent,
1075    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1076        match event.content {
1077            ForwardedRoomKeyContent::MegolmV1AesSha2(_) => {
1078                self.receive_supported_keys(sender_key, event).await
1079            }
1080            #[cfg(feature = "experimental-algorithms")]
1081            ForwardedRoomKeyContent::MegolmV2AesSha2(_) => {
1082                self.receive_supported_keys(sender_key, event).await
1083            }
1084            ForwardedRoomKeyContent::Unknown(_) => {
1085                warn!(
1086                    sender = event.sender.as_str(),
1087                    sender_key = sender_key.to_base64(),
1088                    algorithm = ?event.content.algorithm(),
1089                    "Received a forwarded room key with an unsupported algorithm",
1090                );
1091
1092                Ok(None)
1093            }
1094        }
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use std::sync::Arc;
1101
1102    #[cfg(feature = "automatic-room-key-forwarding")]
1103    use assert_matches::assert_matches;
1104    use matrix_sdk_test::{async_test, message_like_event_content};
1105    use ruma::{
1106        device_id, event_id,
1107        events::{
1108            secret::request::{RequestAction, SecretName, ToDeviceSecretRequestEventContent},
1109            ToDeviceEvent as RumaToDeviceEvent,
1110        },
1111        room_id,
1112        serde::Raw,
1113        user_id, DeviceId, RoomId, UserId,
1114    };
1115    use tokio::sync::Mutex;
1116
1117    use super::GossipMachine;
1118    #[cfg(feature = "automatic-room-key-forwarding")]
1119    use crate::{
1120        gossiping::KeyForwardDecision,
1121        olm::OutboundGroupSession,
1122        store::{CryptoStore, DeviceChanges},
1123        types::requests::AnyOutgoingRequest,
1124        types::{
1125            events::{
1126                forwarded_room_key::ForwardedRoomKeyContent, olm_v1::AnyDecryptedOlmEvent,
1127                olm_v1::DecryptedOlmV1Event,
1128            },
1129            EventEncryptionAlgorithm,
1130        },
1131        EncryptionSettings,
1132    };
1133    use crate::{
1134        identities::{DeviceData, IdentityManager, LocalTrust},
1135        olm::{Account, PrivateCrossSigningIdentity},
1136        session_manager::GroupSessionCache,
1137        store::{Changes, CryptoStoreWrapper, MemoryStore, PendingChanges, Store},
1138        types::events::room::encrypted::{
1139            EncryptedEvent, EncryptedToDeviceEvent, RoomEncryptedEventContent,
1140        },
1141        verification::VerificationMachine,
1142    };
1143
1144    fn alice_id() -> &'static UserId {
1145        user_id!("@alice:example.org")
1146    }
1147
1148    fn alice_device_id() -> &'static DeviceId {
1149        device_id!("JLAFKJWSCS")
1150    }
1151
1152    fn bob_id() -> &'static UserId {
1153        user_id!("@bob:example.org")
1154    }
1155
1156    fn bob_device_id() -> &'static DeviceId {
1157        device_id!("ILMLKASTES")
1158    }
1159
1160    fn alice2_device_id() -> &'static DeviceId {
1161        device_id!("ILMLKASTES")
1162    }
1163
1164    fn room_id() -> &'static RoomId {
1165        room_id!("!test:example.org")
1166    }
1167
1168    fn account() -> Account {
1169        Account::with_device_id(alice_id(), alice_device_id())
1170    }
1171
1172    fn bob_account() -> Account {
1173        Account::with_device_id(bob_id(), bob_device_id())
1174    }
1175
1176    fn alice_2_account() -> Account {
1177        Account::with_device_id(alice_id(), alice2_device_id())
1178    }
1179
1180    #[cfg(feature = "automatic-room-key-forwarding")]
1181    async fn gossip_machine_test_helper(user_id: &UserId) -> GossipMachine {
1182        let user_id = user_id.to_owned();
1183        let device_id = DeviceId::new();
1184
1185        let store = Arc::new(store_with_account_helper(&user_id, &device_id).await);
1186        let static_data = store.load_account().await.unwrap().unwrap().static_data;
1187        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1188        let verification =
1189            VerificationMachine::new(static_data.clone(), identity.clone(), store.clone());
1190        let store = Store::new(static_data, identity, store, verification);
1191
1192        let session_cache = GroupSessionCache::new(store.clone());
1193        let identity_manager = IdentityManager::new(store.clone());
1194
1195        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1196    }
1197
1198    #[cfg(feature = "automatic-room-key-forwarding")]
1199    async fn store_with_account_helper(
1200        user_id: &UserId,
1201        device_id: &DeviceId,
1202    ) -> CryptoStoreWrapper {
1203        // Properly create the store by first saving the own device and then the account
1204        // data.
1205        let account = Account::with_device_id(user_id, device_id);
1206        let device = DeviceData::from_account(&account);
1207        device.set_trust_state(LocalTrust::Verified);
1208
1209        let changes = Changes {
1210            devices: DeviceChanges { new: vec![device], ..Default::default() },
1211            ..Default::default()
1212        };
1213        let mem_store = MemoryStore::new();
1214        mem_store.save_changes(changes).await.unwrap();
1215        mem_store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1216
1217        CryptoStoreWrapper::new(user_id, device_id, mem_store)
1218    }
1219
1220    async fn get_machine_test_helper() -> GossipMachine {
1221        let user_id = alice_id().to_owned();
1222        let account = Account::with_device_id(&user_id, alice_device_id());
1223        let device = DeviceData::from_account(&account);
1224        let another_device =
1225            DeviceData::from_account(&Account::with_device_id(&user_id, alice2_device_id()));
1226
1227        let store =
1228            Arc::new(CryptoStoreWrapper::new(&user_id, account.device_id(), MemoryStore::new()));
1229        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1230        let verification =
1231            VerificationMachine::new(account.static_data.clone(), identity.clone(), store.clone());
1232
1233        let store = Store::new(account.static_data().clone(), identity, store, verification);
1234        store.save_device_data(&[device, another_device]).await.unwrap();
1235        store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1236        let session_cache = GroupSessionCache::new(store.clone());
1237
1238        let identity_manager = IdentityManager::new(store.clone());
1239
1240        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1241    }
1242
1243    #[cfg(feature = "automatic-room-key-forwarding")]
1244    async fn machines_for_key_share_test_helper(
1245        other_machine_owner: &UserId,
1246        create_sessions: bool,
1247        algorithm: EventEncryptionAlgorithm,
1248    ) -> (GossipMachine, OutboundGroupSession, GossipMachine) {
1249        use crate::olm::SenderData;
1250
1251        let alice_machine = get_machine_test_helper().await;
1252        let alice_device = DeviceData::from_account(
1253            &alice_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1254        );
1255
1256        let bob_machine = gossip_machine_test_helper(other_machine_owner).await;
1257
1258        let bob_device = DeviceData::from_account(
1259            #[allow(clippy::explicit_auto_deref)] // clippy's wrong
1260            &*bob_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1261        );
1262
1263        // We need a trusted device, otherwise we won't request keys
1264        let second_device = DeviceData::from_account(&alice_2_account());
1265        second_device.set_trust_state(LocalTrust::Verified);
1266        bob_device.set_trust_state(LocalTrust::Verified);
1267        alice_machine.inner.store.save_device_data(&[bob_device, second_device]).await.unwrap();
1268        bob_machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
1269
1270        if create_sessions {
1271            // Create Olm sessions for our two accounts.
1272            let (alice_session, bob_session) = alice_machine
1273                .inner
1274                .store
1275                .with_transaction(|mut atr| async {
1276                    let sessions = bob_machine
1277                        .inner
1278                        .store
1279                        .with_transaction(|mut btr| async {
1280                            let alice_account = atr.account().await?;
1281                            let bob_account = btr.account().await?;
1282                            let sessions =
1283                                alice_account.create_session_for_test_helper(bob_account).await;
1284                            Ok((btr, sessions))
1285                        })
1286                        .await?;
1287                    Ok((atr, sessions))
1288                })
1289                .await
1290                .unwrap();
1291
1292            // Populate our stores with Olm sessions and a Megolm session.
1293
1294            alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
1295            bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
1296        }
1297
1298        let settings = EncryptionSettings { algorithm, ..Default::default() };
1299        let (group_session, inbound_group_session) = bob_machine
1300            .inner
1301            .store
1302            .static_account()
1303            .create_group_session_pair(room_id(), settings, SenderData::unknown())
1304            .await
1305            .unwrap();
1306
1307        bob_machine
1308            .inner
1309            .store
1310            .save_inbound_group_sessions(&[inbound_group_session])
1311            .await
1312            .unwrap();
1313
1314        let content = group_session.encrypt("m.dummy", &message_like_event_content!({})).await;
1315        let event = wrap_encrypted_content(bob_machine.user_id(), content);
1316
1317        // Alice wants to request the outbound group session from bob.
1318        assert!(
1319            alice_machine.create_outgoing_key_request(room_id(), &event,).await.unwrap(),
1320            "We should request a room key"
1321        );
1322
1323        group_session
1324            .mark_shared_with(
1325                alice_device.user_id(),
1326                alice_device.device_id(),
1327                alice_device.curve25519_key().unwrap(),
1328            )
1329            .await;
1330
1331        // Put the outbound session into bobs store.
1332        bob_machine.inner.outbound_group_sessions.insert(group_session.clone());
1333
1334        (alice_machine, group_session, bob_machine)
1335    }
1336
1337    fn extract_content<'a>(
1338        recipient: &UserId,
1339        request: &'a crate::types::requests::OutgoingRequest,
1340    ) -> &'a Raw<ruma::events::AnyToDeviceEventContent> {
1341        request
1342            .request()
1343            .to_device()
1344            .expect("The request should be always a to-device request")
1345            .messages
1346            .get(recipient)
1347            .unwrap()
1348            .values()
1349            .next()
1350            .unwrap()
1351    }
1352
1353    fn wrap_encrypted_content(
1354        sender: &UserId,
1355        content: Raw<RoomEncryptedEventContent>,
1356    ) -> EncryptedEvent {
1357        let content = content.deserialize().unwrap();
1358
1359        EncryptedEvent {
1360            sender: sender.to_owned(),
1361            event_id: event_id!("$143273582443PhrSn:example.org").to_owned(),
1362            content,
1363            origin_server_ts: ruma::MilliSecondsSinceUnixEpoch::now(),
1364            unsigned: Default::default(),
1365            other: Default::default(),
1366        }
1367    }
1368
1369    fn request_to_event<C>(
1370        recipient: &UserId,
1371        sender: &UserId,
1372        request: &crate::types::requests::OutgoingRequest,
1373    ) -> crate::types::events::ToDeviceEvent<C>
1374    where
1375        C: crate::types::events::EventType
1376            + serde::de::DeserializeOwned
1377            + serde::ser::Serialize
1378            + std::fmt::Debug,
1379    {
1380        let content = extract_content(recipient, request);
1381        let content: C = content.deserialize_as().unwrap_or_else(|_| {
1382            panic!("We can always deserialize the to-device event content {content:?}")
1383        });
1384
1385        crate::types::events::ToDeviceEvent::new(sender.to_owned(), content)
1386    }
1387
1388    #[async_test]
1389    async fn test_create_machine() {
1390        let machine = get_machine_test_helper().await;
1391
1392        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1393    }
1394
1395    #[async_test]
1396    async fn test_re_request_keys() {
1397        let machine = get_machine_test_helper().await;
1398        let account = account();
1399
1400        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1401
1402        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1403        let event = wrap_encrypted_content(machine.user_id(), content);
1404
1405        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1406        let (cancel, request) = machine.request_key(session.room_id(), &event).await.unwrap();
1407
1408        assert!(cancel.is_none());
1409
1410        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1411
1412        let (cancel, _) = machine.request_key(session.room_id(), &event).await.unwrap();
1413
1414        assert!(cancel.is_some());
1415    }
1416
1417    #[async_test]
1418    #[cfg(feature = "automatic-room-key-forwarding")]
1419    async fn test_create_key_request() {
1420        let machine = get_machine_test_helper().await;
1421        let account = account();
1422        let second_account = alice_2_account();
1423        let alice_device = DeviceData::from_account(&second_account);
1424
1425        // We need a trusted device, otherwise we won't request keys
1426        alice_device.set_trust_state(LocalTrust::Verified);
1427        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1428
1429        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1430        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1431        let event = wrap_encrypted_content(machine.user_id(), content);
1432
1433        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1434        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1435        assert!(!machine.outgoing_to_device_requests().await.unwrap().is_empty());
1436        assert_eq!(machine.outgoing_to_device_requests().await.unwrap().len(), 1);
1437
1438        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1439
1440        let requests = machine.outgoing_to_device_requests().await.unwrap();
1441        assert_eq!(requests.len(), 1);
1442
1443        let request = &requests[0];
1444
1445        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1446        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1447    }
1448
1449    /// We should *not* request keys if that has been disabled
1450    #[async_test]
1451    #[cfg(feature = "automatic-room-key-forwarding")]
1452    async fn test_create_key_request_requests_disabled() {
1453        let machine = get_machine_test_helper().await;
1454        let account = account();
1455        let second_account = alice_2_account();
1456        let alice_device = DeviceData::from_account(&second_account);
1457
1458        // We need a trusted device, otherwise we won't request keys
1459        alice_device.set_trust_state(LocalTrust::Verified);
1460        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1461
1462        // Disable key requests
1463        assert!(machine.are_room_key_requests_enabled());
1464        machine.set_room_key_requests_enabled(false);
1465        assert!(!machine.are_room_key_requests_enabled());
1466
1467        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1468        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1469        let event = wrap_encrypted_content(machine.user_id(), content);
1470
1471        // The outgoing to-device requests should be empty before and after
1472        // `create_outgoing_key_request`.
1473        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1474        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1475        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1476    }
1477
1478    #[async_test]
1479    #[cfg(feature = "automatic-room-key-forwarding")]
1480    async fn test_receive_forwarded_key() {
1481        let machine = get_machine_test_helper().await;
1482        let account = account();
1483
1484        let second_account = alice_2_account();
1485        let alice_device = DeviceData::from_account(&second_account);
1486
1487        // We need a trusted device, otherwise we won't request keys
1488        alice_device.set_trust_state(LocalTrust::Verified);
1489        machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
1490
1491        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1492        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1493        let room_event = wrap_encrypted_content(machine.user_id(), content);
1494
1495        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1496
1497        let requests = machine.outgoing_to_device_requests().await.unwrap();
1498        let request = &requests[0];
1499        let id = &request.request_id;
1500
1501        machine.mark_outgoing_request_as_sent(id).await.unwrap();
1502
1503        let export = session.export_at_index(10).await;
1504
1505        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1506
1507        let event = DecryptedOlmV1Event::new(
1508            alice_id(),
1509            alice_id(),
1510            alice_device.ed25519_key().unwrap(),
1511            None,
1512            content,
1513        );
1514
1515        assert!(machine
1516            .inner
1517            .store
1518            .get_inbound_group_session(session.room_id(), session.session_id(),)
1519            .await
1520            .unwrap()
1521            .is_none());
1522
1523        let first_session = machine
1524            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1525            .await
1526            .unwrap();
1527        let first_session = first_session.unwrap();
1528
1529        assert_eq!(first_session.first_known_index(), 10);
1530
1531        machine.inner.store.save_inbound_group_sessions(&[first_session.clone()]).await.unwrap();
1532
1533        // Get the cancel request.
1534        let id = machine
1535            .inner
1536            .outgoing_requests
1537            .read()
1538            .first_key_value()
1539            .map(|(_, r)| r.request_id.clone())
1540            .unwrap();
1541        machine.mark_outgoing_request_as_sent(&id).await.unwrap();
1542
1543        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1544
1545        let requests = machine.outgoing_to_device_requests().await.unwrap();
1546        let request = &requests[0];
1547
1548        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1549
1550        let export = session.export_at_index(15).await;
1551
1552        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1553
1554        let event = DecryptedOlmV1Event::new(
1555            alice_id(),
1556            alice_id(),
1557            alice_device.ed25519_key().unwrap(),
1558            None,
1559            content,
1560        );
1561
1562        let second_session = machine
1563            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1564            .await
1565            .unwrap();
1566
1567        assert!(second_session.is_none());
1568
1569        let export = session.export_at_index(0).await;
1570
1571        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1572
1573        let event = DecryptedOlmV1Event::new(
1574            alice_id(),
1575            alice_id(),
1576            alice_device.ed25519_key().unwrap(),
1577            None,
1578            content,
1579        );
1580
1581        let second_session = machine
1582            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1583            .await
1584            .unwrap();
1585
1586        assert_eq!(second_session.unwrap().first_known_index(), 0);
1587    }
1588
1589    #[async_test]
1590    #[cfg(feature = "automatic-room-key-forwarding")]
1591    async fn test_should_share_key() {
1592        let machine = get_machine_test_helper().await;
1593        let account = account();
1594
1595        let own_device =
1596            machine.inner.store.get_device(alice_id(), alice2_device_id()).await.unwrap().unwrap();
1597
1598        let (outbound, inbound) = account.create_group_session_pair_with_defaults(room_id()).await;
1599
1600        // We don't share keys with untrusted devices.
1601        assert_matches!(
1602            machine.should_share_key(&own_device, &inbound).await,
1603            Err(KeyForwardDecision::UntrustedDevice)
1604        );
1605        own_device.set_trust_state(LocalTrust::Verified);
1606        // Now we do want to share the keys.
1607        machine.should_share_key(&own_device, &inbound).await.unwrap();
1608
1609        let bob_device = DeviceData::from_account(&bob_account());
1610        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1611
1612        let bob_device =
1613            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1614
1615        // We don't share sessions with other user's devices if no outbound
1616        // session was provided.
1617        assert_matches!(
1618            machine.should_share_key(&bob_device, &inbound).await,
1619            Err(KeyForwardDecision::MissingOutboundSession)
1620        );
1621
1622        let mut changes = Changes::default();
1623
1624        changes.outbound_group_sessions.push(outbound.clone());
1625        changes.inbound_group_sessions.push(inbound.clone());
1626        machine.inner.store.save_changes(changes).await.unwrap();
1627        machine.inner.outbound_group_sessions.insert(outbound.clone());
1628
1629        // We don't share sessions with other user's devices if the session
1630        // wasn't shared in the first place.
1631        assert_matches!(
1632            machine.should_share_key(&bob_device, &inbound).await,
1633            Err(KeyForwardDecision::OutboundSessionNotShared)
1634        );
1635
1636        bob_device.set_trust_state(LocalTrust::Verified);
1637
1638        // We don't share sessions with other user's devices if the session
1639        // wasn't shared in the first place even if the device is trusted.
1640        assert_matches!(
1641            machine.should_share_key(&bob_device, &inbound).await,
1642            Err(KeyForwardDecision::OutboundSessionNotShared)
1643        );
1644
1645        // We now share the session, since it was shared before.
1646        outbound
1647            .mark_shared_with(
1648                bob_device.user_id(),
1649                bob_device.device_id(),
1650                bob_device.curve25519_key().unwrap(),
1651            )
1652            .await;
1653        machine.should_share_key(&bob_device, &inbound).await.unwrap();
1654
1655        let (other_outbound, other_inbound) =
1656            account.create_group_session_pair_with_defaults(room_id()).await;
1657
1658        // But we don't share some other session that doesn't match our outbound
1659        // session.
1660        assert_matches!(
1661            machine.should_share_key(&bob_device, &other_inbound).await,
1662            Err(KeyForwardDecision::MissingOutboundSession)
1663        );
1664
1665        // Finally, let's ensure we don't share the session with a device that rotated
1666        // its curve25519 key.
1667        let bob_device = DeviceData::from_account(&bob_account());
1668        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1669
1670        let bob_device =
1671            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1672        assert_matches!(
1673            machine.should_share_key(&bob_device, &inbound).await,
1674            Err(KeyForwardDecision::ChangedSenderKey)
1675        );
1676
1677        // Now let's encrypt some messages in another session to increment the message
1678        // index and then share it with our own untrusted device.
1679        own_device.set_trust_state(LocalTrust::Unset);
1680
1681        for _ in 1..=3 {
1682            other_outbound.encrypt_helper("foo".to_owned()).await;
1683        }
1684        other_outbound
1685            .mark_shared_with(
1686                own_device.user_id(),
1687                own_device.device_id(),
1688                own_device.curve25519_key().unwrap(),
1689            )
1690            .await;
1691
1692        machine.inner.outbound_group_sessions.insert(other_outbound.clone());
1693
1694        // Since our device is untrusted, we should share the session starting only from
1695        // the current index (at which the message was marked as shared). This
1696        // should be 3 since we encrypted 3 messages.
1697        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(Some(3)));
1698
1699        own_device.set_trust_state(LocalTrust::Verified);
1700
1701        // However once our device is trusted, we share the entire session.
1702        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(None));
1703    }
1704
1705    #[cfg(feature = "automatic-room-key-forwarding")]
1706    async fn test_key_share_cycle(algorithm: EventEncryptionAlgorithm) {
1707        let (alice_machine, group_session, bob_machine) =
1708            machines_for_key_share_test_helper(alice_id(), true, algorithm).await;
1709
1710        // Get the request and convert it into a event.
1711        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1712        let request = &requests[0];
1713        let event = request_to_event(alice_id(), alice_id(), request);
1714
1715        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1716
1717        // Bob doesn't have any outgoing requests.
1718        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1719
1720        // Receive the room key request from alice.
1721        bob_machine.receive_incoming_key_request(&event);
1722
1723        {
1724            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1725            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1726        }
1727        // Now bob does have an outgoing request.
1728        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1729
1730        // Get the request and convert it to a encrypted to-device event.
1731        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1732        let request = &requests[0];
1733
1734        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
1735        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1736
1737        // Check that alice doesn't have the session.
1738        assert!(alice_machine
1739            .inner
1740            .store
1741            .get_inbound_group_session(room_id(), group_session.session_id())
1742            .await
1743            .unwrap()
1744            .is_none());
1745
1746        let decrypted = alice_machine
1747            .inner
1748            .store
1749            .with_transaction(|mut tr| async {
1750                let res = tr
1751                    .account()
1752                    .await?
1753                    .decrypt_to_device_event(&alice_machine.inner.store, &event)
1754                    .await?;
1755                Ok((tr, res))
1756            })
1757            .await
1758            .unwrap();
1759
1760        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1761            panic!("Invalid decrypted event type");
1762        };
1763
1764        let session = alice_machine
1765            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1766            .await
1767            .unwrap();
1768        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
1769
1770        // Check that alice now does have the session.
1771        let session = alice_machine
1772            .inner
1773            .store
1774            .get_inbound_group_session(room_id(), group_session.session_id())
1775            .await
1776            .unwrap()
1777            .unwrap();
1778
1779        assert_eq!(session.session_id(), group_session.session_id())
1780    }
1781
1782    #[async_test]
1783    #[cfg(feature = "automatic-room-key-forwarding")]
1784    async fn test_reject_forward_from_another_user() {
1785        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
1786            bob_id(),
1787            true,
1788            EventEncryptionAlgorithm::MegolmV1AesSha2,
1789        )
1790        .await;
1791
1792        // Get the request and convert it into a event.
1793        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1794        let request = &requests[0];
1795        let event = request_to_event(alice_id(), alice_id(), request);
1796
1797        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1798
1799        // Bob doesn't have any outgoing requests.
1800        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1801
1802        // Receive the room key request from alice.
1803        bob_machine.receive_incoming_key_request(&event);
1804        {
1805            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1806            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1807        }
1808        // Now bob does have an outgoing request.
1809        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1810
1811        // Get the request and convert it to a encrypted to-device event.
1812        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1813        let request = &requests[0];
1814
1815        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), bob_id(), request);
1816        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1817
1818        // Check that alice doesn't have the session.
1819        assert!(alice_machine
1820            .inner
1821            .store
1822            .get_inbound_group_session(room_id(), group_session.session_id())
1823            .await
1824            .unwrap()
1825            .is_none());
1826
1827        let decrypted = alice_machine
1828            .inner
1829            .store
1830            .with_transaction(|mut tr| async {
1831                let res = tr
1832                    .account()
1833                    .await?
1834                    .decrypt_to_device_event(&alice_machine.inner.store, &event)
1835                    .await?;
1836                Ok((tr, res))
1837            })
1838            .await
1839            .unwrap();
1840        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1841            panic!("Invalid decrypted event type");
1842        };
1843
1844        let session = alice_machine
1845            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1846            .await
1847            .unwrap();
1848
1849        assert!(session.is_none(), "We should not receive a room key from another user");
1850    }
1851
1852    #[async_test]
1853    #[cfg(feature = "automatic-room-key-forwarding")]
1854    async fn test_key_share_cycle_megolm_v1() {
1855        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV1AesSha2).await;
1856    }
1857
1858    #[async_test]
1859    #[cfg(all(feature = "experimental-algorithms", feature = "automatic-room-key-forwarding"))]
1860    async fn test_key_share_cycle_megolm_v2() {
1861        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV2AesSha2).await;
1862    }
1863
1864    #[async_test]
1865    async fn test_secret_share_cycle() {
1866        let alice_machine = get_machine_test_helper().await;
1867
1868        let mut second_account = alice_2_account();
1869        let alice_device = DeviceData::from_account(&second_account);
1870
1871        let bob_account = bob_account();
1872        let bob_device = DeviceData::from_account(&bob_account);
1873
1874        alice_machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
1875
1876        // Create Olm sessions for our two accounts.
1877        let alice_session = alice_machine
1878            .inner
1879            .store
1880            .with_transaction(|mut tr| async {
1881                let alice_account = tr.account().await?;
1882                let (alice_session, _) =
1883                    alice_account.create_session_for_test_helper(&mut second_account).await;
1884                Ok((tr, alice_session))
1885            })
1886            .await
1887            .unwrap();
1888
1889        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
1890
1891        let event = RumaToDeviceEvent {
1892            sender: bob_account.user_id().to_owned(),
1893            content: ToDeviceSecretRequestEventContent::new(
1894                RequestAction::Request(SecretName::CrossSigningMasterKey),
1895                bob_account.device_id().to_owned(),
1896                "request_id".into(),
1897            ),
1898        };
1899
1900        // No secret found
1901        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1902        alice_machine.receive_incoming_secret_request(&event);
1903        {
1904            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1905            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1906        }
1907        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1908
1909        // No device found
1910        alice_machine.inner.store.reset_cross_signing_identity().await;
1911        alice_machine.receive_incoming_secret_request(&event);
1912        {
1913            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1914            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1915        }
1916        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1917
1918        alice_machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1919
1920        // The device doesn't belong to us
1921        alice_machine.inner.store.reset_cross_signing_identity().await;
1922        alice_machine.receive_incoming_secret_request(&event);
1923        {
1924            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1925            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1926        }
1927        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1928
1929        let event = RumaToDeviceEvent {
1930            sender: alice_id().to_owned(),
1931            content: ToDeviceSecretRequestEventContent::new(
1932                RequestAction::Request(SecretName::CrossSigningMasterKey),
1933                second_account.device_id().into(),
1934                "request_id".into(),
1935            ),
1936        };
1937
1938        // The device isn't trusted
1939        alice_machine.receive_incoming_secret_request(&event);
1940        {
1941            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1942            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1943        }
1944        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1945
1946        // We need a trusted device, otherwise we won't serve secrets
1947        alice_device.set_trust_state(LocalTrust::Verified);
1948        alice_machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
1949
1950        alice_machine.receive_incoming_secret_request(&event);
1951        {
1952            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1953            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1954        }
1955        assert!(!alice_machine.inner.outgoing_requests.read().is_empty());
1956    }
1957
1958    #[async_test]
1959    async fn test_secret_broadcasting() {
1960        use futures_util::{pin_mut, FutureExt};
1961        use ruma::api::client::to_device::send_event_to_device::v3::Response as ToDeviceResponse;
1962        use serde_json::value::to_raw_value;
1963        use tokio_stream::StreamExt;
1964
1965        use crate::{
1966            machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper,
1967            EncryptionSyncChanges,
1968        };
1969
1970        let alice_id = user_id!("@alice:localhost");
1971
1972        let (alice_machine, bob_machine) =
1973            get_machine_pair_with_setup_sessions_test_helper(alice_id, alice_id, false).await;
1974
1975        let key_requests = GossipMachine::request_missing_secrets(
1976            bob_machine.user_id(),
1977            vec![SecretName::RecoveryKey],
1978        );
1979        let mut changes = Changes::default();
1980        let request_id = key_requests[0].request_id.to_owned();
1981        changes.key_requests = key_requests;
1982        bob_machine.store().save_changes(changes).await.unwrap();
1983        for request in bob_machine.outgoing_requests().await.unwrap() {
1984            bob_machine
1985                .mark_request_as_sent(request.request_id(), &ToDeviceResponse::new())
1986                .await
1987                .unwrap();
1988        }
1989
1990        let event = RumaToDeviceEvent {
1991            sender: alice_machine.user_id().to_owned(),
1992            content: ToDeviceSecretRequestEventContent::new(
1993                RequestAction::Request(SecretName::RecoveryKey),
1994                bob_machine.device_id().to_owned(),
1995                request_id,
1996            ),
1997        };
1998
1999        let bob_device = alice_machine
2000            .get_device(alice_id, bob_machine.device_id(), None)
2001            .await
2002            .unwrap()
2003            .unwrap();
2004        let alice_device = bob_machine
2005            .get_device(alice_id, alice_machine.device_id(), None)
2006            .await
2007            .unwrap()
2008            .unwrap();
2009
2010        // We need a trusted device, otherwise we won't serve nor accept secrets.
2011        bob_device.set_trust_state(LocalTrust::Verified);
2012        alice_device.set_trust_state(LocalTrust::Verified);
2013        alice_machine.store().save_device_data(&[bob_device.inner]).await.unwrap();
2014        bob_machine.store().save_device_data(&[alice_device.inner]).await.unwrap();
2015
2016        let decryption_key = crate::store::BackupDecryptionKey::new().unwrap();
2017        alice_machine
2018            .backup_machine()
2019            .save_decryption_key(Some(decryption_key), None)
2020            .await
2021            .unwrap();
2022        alice_machine.inner.key_request_machine.receive_incoming_secret_request(&event);
2023        {
2024            let alice_cache = alice_machine.store().cache().await.unwrap();
2025            alice_machine
2026                .inner
2027                .key_request_machine
2028                .collect_incoming_key_requests(&alice_cache)
2029                .await
2030                .unwrap();
2031        }
2032
2033        let requests =
2034            alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
2035
2036        assert_eq!(requests.len(), 1);
2037        let request = requests.first().expect("We should have an outgoing to-device request");
2038
2039        let event: EncryptedToDeviceEvent =
2040            request_to_event(bob_machine.user_id(), alice_machine.user_id(), request);
2041        let event = Raw::from_json(to_raw_value(&event).unwrap());
2042
2043        let stream = bob_machine.store().secrets_stream();
2044        pin_mut!(stream);
2045
2046        bob_machine
2047            .receive_sync_changes(EncryptionSyncChanges {
2048                to_device_events: vec![event],
2049                changed_devices: &Default::default(),
2050                one_time_keys_counts: &Default::default(),
2051                unused_fallback_keys: None,
2052                next_batch_token: None,
2053            })
2054            .await
2055            .unwrap();
2056
2057        stream.next().now_or_never().expect("The broadcaster should have sent out the secret");
2058    }
2059
2060    #[async_test]
2061    #[cfg(feature = "automatic-room-key-forwarding")]
2062    async fn test_key_share_cycle_without_session() {
2063        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
2064            alice_id(),
2065            false,
2066            EventEncryptionAlgorithm::MegolmV1AesSha2,
2067        )
2068        .await;
2069
2070        // Get the request and convert it into a event.
2071        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
2072        let request = &requests[0];
2073        let event = request_to_event(alice_id(), alice_id(), request);
2074
2075        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2076
2077        // Bob doesn't have any outgoing requests.
2078        assert!(bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2079        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2080        assert!(bob_machine.inner.wait_queue.is_empty());
2081
2082        // Receive the room key request from alice.
2083        bob_machine.receive_incoming_key_request(&event);
2084        {
2085            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2086            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2087        }
2088        // Bob only has a keys claim request, since we're lacking a session
2089        assert_eq!(bob_machine.outgoing_to_device_requests().await.unwrap().len(), 1);
2090        assert_matches!(
2091            bob_machine.outgoing_to_device_requests().await.unwrap()[0].request(),
2092            AnyOutgoingRequest::KeysClaim(_)
2093        );
2094        assert!(!bob_machine.inner.users_for_key_claim.read().is_empty());
2095        assert!(!bob_machine.inner.wait_queue.is_empty());
2096
2097        let (alice_session, bob_session) = alice_machine
2098            .inner
2099            .store
2100            .with_transaction(|mut atr| async {
2101                let res = bob_machine
2102                    .inner
2103                    .store
2104                    .with_transaction(|mut btr| async {
2105                        let alice_account = atr.account().await?;
2106                        let bob_account = btr.account().await?;
2107                        let sessions =
2108                            alice_account.create_session_for_test_helper(bob_account).await;
2109                        Ok((btr, sessions))
2110                    })
2111                    .await?;
2112                Ok((atr, res))
2113            })
2114            .await
2115            .unwrap();
2116
2117        // We create a session now.
2118        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
2119        bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
2120
2121        bob_machine.retry_keyshare(alice_id(), alice_device_id());
2122        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2123        {
2124            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2125            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2126        }
2127        // Bob now has an outgoing requests.
2128        assert!(!bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2129        assert!(bob_machine.inner.wait_queue.is_empty());
2130
2131        // Get the request and convert it to a encrypted to-device event.
2132        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
2133        let request = &requests[0];
2134
2135        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
2136        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2137
2138        // Check that alice doesn't have the session.
2139        assert!(alice_machine
2140            .inner
2141            .store
2142            .get_inbound_group_session(room_id(), group_session.session_id())
2143            .await
2144            .unwrap()
2145            .is_none());
2146
2147        let decrypted = alice_machine
2148            .inner
2149            .store
2150            .with_transaction(|mut tr| async {
2151                let res = tr
2152                    .account()
2153                    .await?
2154                    .decrypt_to_device_event(&alice_machine.inner.store, &event)
2155                    .await?;
2156                Ok((tr, res))
2157            })
2158            .await
2159            .unwrap();
2160
2161        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
2162            panic!("Invalid decrypted event type");
2163        };
2164
2165        let session = alice_machine
2166            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
2167            .await
2168            .unwrap();
2169        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
2170
2171        // Check that alice now does have the session.
2172        let session = alice_machine
2173            .inner
2174            .store
2175            .get_inbound_group_session(room_id(), group_session.session_id())
2176            .await
2177            .unwrap()
2178            .unwrap();
2179
2180        assert_eq!(session.session_id(), group_session.session_id())
2181    }
2182}