matrix_sdk_crypto/gossiping/
machine.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// TODO
16//
17// handle the case where we can't create a session with a device. clearing our
18// stale key share requests that we'll never be able to handle.
19//
20// If we don't trust the device store an object that remembers the request and
21// let the users introspect that object.
22
23use std::{
24    collections::{btree_map::Entry, BTreeMap, BTreeSet},
25    mem,
26    sync::{
27        atomic::{AtomicBool, Ordering},
28        Arc,
29    },
30};
31
32use matrix_sdk_common::locks::RwLock as StdRwLock;
33use ruma::{
34    api::client::keys::claim_keys::v3::Request as KeysClaimRequest,
35    events::secret::request::{
36        RequestAction, SecretName, ToDeviceSecretRequestEvent as SecretRequestEvent,
37    },
38    DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedTransactionId, OwnedUserId, RoomId,
39    TransactionId, UserId,
40};
41use tracing::{debug, field::debug, info, instrument, trace, warn, Span};
42use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey};
43
44use super::{GossipRequest, GossippedSecret, RequestEvent, RequestInfo, SecretInfo, WaitQueue};
45use crate::{
46    error::{EventError, OlmError, OlmResult},
47    identities::IdentityManager,
48    olm::{InboundGroupSession, Session},
49    session_manager::GroupSessionCache,
50    store::{Changes, CryptoStoreError, SecretImportError, Store, StoreCache},
51    types::{
52        events::{
53            forwarded_room_key::ForwardedRoomKeyContent,
54            olm_v1::{DecryptedForwardedRoomKeyEvent, DecryptedSecretSendEvent},
55            room::encrypted::EncryptedEvent,
56            room_key_request::RoomKeyRequestEvent,
57            secret_send::SecretSendContent,
58            EventType,
59        },
60        requests::{OutgoingRequest, ToDeviceRequest},
61    },
62    Device, MegolmError,
63};
64
65#[derive(Clone, Debug)]
66pub(crate) struct GossipMachine {
67    inner: Arc<GossipMachineInner>,
68}
69
70#[derive(Debug)]
71pub(crate) struct GossipMachineInner {
72    store: Store,
73    #[cfg(feature = "automatic-room-key-forwarding")]
74    outbound_group_sessions: GroupSessionCache,
75    outgoing_requests: StdRwLock<BTreeMap<OwnedTransactionId, OutgoingRequest>>,
76    incoming_key_requests: StdRwLock<BTreeMap<RequestInfo, RequestEvent>>,
77    wait_queue: WaitQueue,
78    users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
79
80    /// Whether we should respond to incoming `m.room_key_request` messages.
81    room_key_forwarding_enabled: AtomicBool,
82
83    /// Whether we should send out `m.room_key_request` messages.
84    room_key_requests_enabled: AtomicBool,
85
86    identity_manager: IdentityManager,
87}
88
89impl GossipMachine {
90    pub fn new(
91        store: Store,
92        identity_manager: IdentityManager,
93        #[allow(unused)] outbound_group_sessions: GroupSessionCache,
94        users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
95    ) -> Self {
96        let room_key_forwarding_enabled =
97            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
98
99        let room_key_requests_enabled =
100            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
101
102        Self {
103            inner: Arc::new(GossipMachineInner {
104                store,
105                #[cfg(feature = "automatic-room-key-forwarding")]
106                outbound_group_sessions,
107                outgoing_requests: Default::default(),
108                incoming_key_requests: Default::default(),
109                wait_queue: WaitQueue::new(),
110                users_for_key_claim,
111                room_key_forwarding_enabled,
112                room_key_requests_enabled,
113                identity_manager,
114            }),
115        }
116    }
117
118    pub(crate) fn identity_manager(&self) -> &IdentityManager {
119        &self.inner.identity_manager
120    }
121
122    #[cfg(feature = "automatic-room-key-forwarding")]
123    pub fn set_room_key_forwarding_enabled(&self, enabled: bool) {
124        self.inner.room_key_forwarding_enabled.store(enabled, Ordering::SeqCst)
125    }
126
127    pub fn is_room_key_forwarding_enabled(&self) -> bool {
128        self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst)
129    }
130
131    /// Configure whether we should send outgoing `m.room_key_request`s on
132    /// decryption failure.
133    #[cfg(feature = "automatic-room-key-forwarding")]
134    pub fn set_room_key_requests_enabled(&self, enabled: bool) {
135        self.inner.room_key_requests_enabled.store(enabled, Ordering::SeqCst)
136    }
137
138    /// Query whether we should send outgoing `m.room_key_request`s on
139    /// decryption failure.
140    pub fn are_room_key_requests_enabled(&self) -> bool {
141        self.inner.room_key_requests_enabled.load(Ordering::SeqCst)
142    }
143
144    /// Load stored outgoing requests that were not yet sent out.
145    async fn load_outgoing_requests(&self) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
146        Ok(self
147            .inner
148            .store
149            .get_unsent_secret_requests()
150            .await?
151            .into_iter()
152            .filter(|i| !i.sent_out)
153            .map(|info| info.to_request(self.device_id()))
154            .collect())
155    }
156
157    /// Our own user id.
158    pub fn user_id(&self) -> &UserId {
159        &self.inner.store.static_account().user_id
160    }
161
162    /// Our own device ID.
163    pub fn device_id(&self) -> &DeviceId {
164        &self.inner.store.static_account().device_id
165    }
166
167    pub async fn outgoing_to_device_requests(
168        &self,
169    ) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
170        let mut key_requests = self.load_outgoing_requests().await?;
171        let key_forwards: Vec<OutgoingRequest> =
172            self.inner.outgoing_requests.read().values().cloned().collect();
173        key_requests.extend(key_forwards);
174
175        let users_for_key_claim: BTreeMap<_, _> = self
176            .inner
177            .users_for_key_claim
178            .read()
179            .iter()
180            .map(|(key, value)| {
181                let device_map = value
182                    .iter()
183                    .map(|d| (d.to_owned(), OneTimeKeyAlgorithm::SignedCurve25519))
184                    .collect();
185
186                (key.to_owned(), device_map)
187            })
188            .collect();
189
190        if !users_for_key_claim.is_empty() {
191            let key_claim_request = KeysClaimRequest::new(users_for_key_claim);
192            key_requests.push(OutgoingRequest {
193                request_id: TransactionId::new(),
194                request: Arc::new(key_claim_request.into()),
195            });
196        }
197
198        Ok(key_requests)
199    }
200
201    /// Receive a room key request event.
202    pub fn receive_incoming_key_request(&self, event: &RoomKeyRequestEvent) {
203        self.receive_event(event.clone().into())
204    }
205
206    fn receive_event(&self, event: RequestEvent) {
207        // Some servers might send to-device events to ourselves if we send one
208        // out using a wildcard instead of a specific device as a recipient.
209        //
210        // Check if we're the sender of this request event and ignore it if
211        // so.
212        if event.sender() == self.user_id() && event.requesting_device_id() == self.device_id() {
213            trace!("Received a secret request event from ourselves, ignoring")
214        } else {
215            let request_info = event.to_request_info();
216            self.inner.incoming_key_requests.write().insert(request_info, event);
217        }
218    }
219
220    pub fn receive_incoming_secret_request(&self, event: &SecretRequestEvent) {
221        self.receive_event(event.clone().into())
222    }
223
224    /// Handle all the incoming key requests that are queued up and empty our
225    /// key request queue.
226    pub async fn collect_incoming_key_requests(
227        &self,
228        cache: &StoreCache,
229    ) -> OlmResult<Vec<Session>> {
230        let mut changed_sessions = Vec::new();
231
232        let incoming_key_requests = mem::take(&mut *self.inner.incoming_key_requests.write());
233
234        for event in incoming_key_requests.values() {
235            if let Some(s) = match event {
236                #[cfg(feature = "automatic-room-key-forwarding")]
237                RequestEvent::KeyShare(e) => Box::pin(self.handle_key_request(cache, e)).await?,
238                RequestEvent::Secret(e) => Box::pin(self.handle_secret_request(cache, e)).await?,
239                #[cfg(not(feature = "automatic-room-key-forwarding"))]
240                _ => None,
241            } {
242                changed_sessions.push(s);
243            }
244        }
245
246        Ok(changed_sessions)
247    }
248
249    /// Store the key share request for later, once we get an Olm session with
250    /// the given device [`retry_keyshare`](#method.retry_keyshare) should be
251    /// called.
252    fn handle_key_share_without_session(&self, device: Device, event: RequestEvent) {
253        self.inner
254            .users_for_key_claim
255            .write()
256            .entry(device.user_id().to_owned())
257            .or_default()
258            .insert(device.device_id().into());
259        self.inner.wait_queue.insert(&device, event);
260    }
261
262    /// Retry keyshares for a device that previously didn't have an Olm session
263    /// with us.
264    ///
265    /// This should be only called if the given user/device got a new Olm
266    /// session.
267    ///
268    /// # Arguments
269    ///
270    /// * `user_id` - The user id of the device that we created the Olm session
271    ///   with.
272    ///
273    /// * `device_id` - The device ID of the device that got the Olm session.
274    pub fn retry_keyshare(&self, user_id: &UserId, device_id: &DeviceId) {
275        if let Entry::Occupied(mut e) =
276            self.inner.users_for_key_claim.write().entry(user_id.to_owned())
277        {
278            e.get_mut().remove(device_id);
279
280            if e.get().is_empty() {
281                e.remove();
282            }
283        }
284
285        let mut incoming_key_requests = self.inner.incoming_key_requests.write();
286        for (key, event) in self.inner.wait_queue.remove(user_id, device_id) {
287            incoming_key_requests.entry(key).or_insert(event);
288        }
289    }
290
291    async fn handle_secret_request(
292        &self,
293        cache: &StoreCache,
294        event: &SecretRequestEvent,
295    ) -> OlmResult<Option<Session>> {
296        let secret_name = match &event.content.action {
297            RequestAction::Request(s) => s,
298            // We ignore cancellations here since there's nothing to serve.
299            RequestAction::RequestCancellation => return Ok(None),
300            action => {
301                warn!(?action, "Unknown secret request action");
302                return Ok(None);
303            }
304        };
305
306        let content = if let Some(secret) = self.inner.store.export_secret(secret_name).await? {
307            SecretSendContent::new(event.content.request_id.to_owned(), secret)
308        } else {
309            info!(?secret_name, "Can't serve a secret request, secret isn't found");
310            return Ok(None);
311        };
312
313        let device =
314            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
315
316        if let Some(device) = device {
317            if device.user_id() == self.user_id() {
318                if device.is_verified() {
319                    info!(
320                        user_id = ?device.user_id(),
321                        device_id = ?device.device_id(),
322                        ?secret_name,
323                        "Sharing a secret with a device",
324                    );
325
326                    match self.share_secret(&device, content).await {
327                        Ok(s) => Ok(Some(s)),
328                        Err(OlmError::MissingSession) => {
329                            info!(
330                                user_id = ?device.user_id(),
331                                device_id = ?device.device_id(),
332                                ?secret_name,
333                                "Secret request is missing an Olm session, \
334                                putting the request in the wait queue",
335                            );
336                            self.handle_key_share_without_session(device, event.clone().into());
337
338                            Ok(None)
339                        }
340                        Err(e) => Err(e),
341                    }
342                } else {
343                    info!(
344                        user_id = ?device.user_id(),
345                        device_id = ?device.device_id(),
346                        ?secret_name,
347                        "Received a secret request that we won't serve, the device isn't trusted",
348                    );
349
350                    Ok(None)
351                }
352            } else {
353                info!(
354                    user_id = ?device.user_id(),
355                    device_id = ?device.device_id(),
356                    ?secret_name,
357                    "Received a secret request that we won't serve, the device doesn't belong to us",
358                );
359
360                Ok(None)
361            }
362        } else {
363            warn!(
364                user_id = ?event.sender,
365                device_id = ?event.content.requesting_device_id,
366                ?secret_name,
367                "Received a secret request from an unknown device",
368            );
369            self.inner
370                .identity_manager
371                .key_query_manager
372                .synced(cache)
373                .await?
374                .mark_user_as_changed(&event.sender)
375                .await?;
376
377            Ok(None)
378        }
379    }
380
381    /// Try to encrypt the given `InboundGroupSession` for the given `Device` as
382    /// a forwarded room key.
383    ///
384    /// This method might fail if we do not share an 1-to-1 Olm session with the
385    /// given `Device`, in that case we're going to queue up an
386    /// `/keys/claim` request to be sent out and retry once the 1-to-1 Olm
387    /// session has been established.
388    #[cfg(feature = "automatic-room-key-forwarding")]
389    async fn try_to_forward_room_key(
390        &self,
391        event: &RoomKeyRequestEvent,
392        device: Device,
393        session: &InboundGroupSession,
394        message_index: Option<u32>,
395    ) -> OlmResult<Option<Session>> {
396        info!(?message_index, "Serving a room key request",);
397
398        match self.forward_room_key(session, &device, message_index).await {
399            Ok(s) => Ok(Some(s)),
400            Err(OlmError::MissingSession) => {
401                info!(
402                    "Key request is missing an Olm session, putting the request in the wait queue",
403                );
404                self.handle_key_share_without_session(device, event.to_owned().into());
405
406                Ok(None)
407            }
408            Err(OlmError::SessionExport(e)) => {
409                warn!(
410                    "Can't serve a room key request, the session \
411                     can't be exported into a forwarded room key: {e:?}",
412                );
413                Ok(None)
414            }
415            Err(e) => Err(e),
416        }
417    }
418
419    /// Answer a room key request after we found the matching
420    /// `InboundGroupSession`.
421    #[cfg(feature = "automatic-room-key-forwarding")]
422    async fn answer_room_key_request(
423        &self,
424        cache: &StoreCache,
425        event: &RoomKeyRequestEvent,
426        session: &InboundGroupSession,
427    ) -> OlmResult<Option<Session>> {
428        use super::KeyForwardDecision;
429
430        let device =
431            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
432
433        let Some(device) = device else {
434            warn!("Received a key request from an unknown device");
435            self.identity_manager()
436                .key_query_manager
437                .synced(cache)
438                .await?
439                .mark_user_as_changed(&event.sender)
440                .await?;
441
442            return Ok(None);
443        };
444
445        match self.should_share_key(&device, session).await {
446            Ok(message_index) => {
447                self.try_to_forward_room_key(event, device, session, message_index).await
448            }
449            Err(e) => {
450                if let KeyForwardDecision::ChangedSenderKey = e {
451                    warn!(
452                        "Received a key request from a device that changed \
453                         their Curve25519 sender key"
454                    );
455                } else {
456                    debug!(
457                        reason = ?e,
458                        "Received a key request that we won't serve",
459                    );
460                }
461
462                Ok(None)
463            }
464        }
465    }
466
467    #[cfg(feature = "automatic-room-key-forwarding")]
468    #[tracing::instrument(
469        skip_all,
470        fields(
471            user_id = ?event.sender,
472            device_id = ?event.content.requesting_device_id,
473            ?room_id,
474            session_id
475        )
476    )]
477    async fn handle_supported_key_request(
478        &self,
479        cache: &StoreCache,
480        event: &RoomKeyRequestEvent,
481        room_id: &RoomId,
482        session_id: &str,
483    ) -> OlmResult<Option<Session>> {
484        let session = self.inner.store.get_inbound_group_session(room_id, session_id).await?;
485
486        if let Some(s) = session {
487            self.answer_room_key_request(cache, event, &s).await
488        } else {
489            debug!("Received a room key request for an unknown inbound group session",);
490
491            Ok(None)
492        }
493    }
494
495    /// Handle a single incoming key request.
496    #[cfg(feature = "automatic-room-key-forwarding")]
497    async fn handle_key_request(
498        &self,
499        cache: &StoreCache,
500        event: &RoomKeyRequestEvent,
501    ) -> OlmResult<Option<Session>> {
502        use crate::types::events::room_key_request::{Action, RequestedKeyInfo};
503
504        if self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst) {
505            match &event.content.action {
506                Action::Request(info) => match info {
507                    RequestedKeyInfo::MegolmV1AesSha2(i) => {
508                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
509                            .await
510                    }
511                    #[cfg(feature = "experimental-algorithms")]
512                    RequestedKeyInfo::MegolmV2AesSha2(i) => {
513                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
514                            .await
515                    }
516                    RequestedKeyInfo::Unknown(i) => {
517                        debug!(
518                            sender = ?event.sender,
519                            algorithm = ?i.algorithm,
520                            "Received a room key request for a unsupported algorithm"
521                        );
522                        Ok(None)
523                    }
524                },
525                // We ignore cancellations here since there's nothing to serve.
526                Action::Cancellation => Ok(None),
527            }
528        } else {
529            debug!(
530                sender = ?event.sender,
531                "Received a room key request, but room key forwarding has been turned off"
532            );
533            Ok(None)
534        }
535    }
536
537    async fn share_secret(
538        &self,
539        device: &Device,
540        content: SecretSendContent,
541    ) -> OlmResult<Session> {
542        let event_type = content.event_type();
543        let (used_session, content) = device.encrypt(event_type, content).await?;
544
545        let request = ToDeviceRequest::new(
546            device.user_id(),
547            device.device_id().to_owned(),
548            content.event_type(),
549            content.cast(),
550        );
551
552        let request = OutgoingRequest {
553            request_id: request.txn_id.clone(),
554            request: Arc::new(request.into()),
555        };
556        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
557
558        Ok(used_session)
559    }
560
561    #[cfg(feature = "automatic-room-key-forwarding")]
562    async fn forward_room_key(
563        &self,
564        session: &InboundGroupSession,
565        device: &Device,
566        message_index: Option<u32>,
567    ) -> OlmResult<Session> {
568        let (used_session, content) =
569            device.encrypt_room_key_for_forwarding(session.clone(), message_index).await?;
570
571        let request = ToDeviceRequest::new(
572            device.user_id(),
573            device.device_id().to_owned(),
574            content.event_type(),
575            content.cast(),
576        );
577
578        let request = OutgoingRequest {
579            request_id: request.txn_id.clone(),
580            request: Arc::new(request.into()),
581        };
582        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
583
584        Ok(used_session)
585    }
586
587    /// Check if it's ok to share a session with the given device.
588    ///
589    /// The logic for this is currently as follows:
590    ///
591    /// * Share the session in full, starting from the earliest known index, if
592    ///   the requesting device is our own, trusted (verified) device.
593    ///
594    /// * For other requesting devices, share only a limited session and only if
595    ///   we originally shared with that device because it was present when the
596    ///   message was initially sent. By limited, we mean that the session will
597    ///   not be shared in full, but only from the message index at that moment.
598    ///   Since this information is recorded in the outbound session, we need to
599    ///   have it for this to work.
600    ///
601    /// * In all other cases, refuse to share the session.
602    ///
603    /// # Arguments
604    ///
605    /// * `device` - The device that is requesting a session from us.
606    ///
607    /// * `session` - The session that was requested to be shared.
608    ///
609    /// # Return value
610    ///
611    /// A `Result` representing whether we should share the session:
612    ///
613    /// - `Ok(None)`: Should share the entire session, starting with the
614    ///   earliest known index.
615    /// - `Ok(Some(i))`: Should share the session, but only starting from index
616    ///   i.
617    /// - `Err(x)`: Should *refuse* to share the session. `x` is the reason for
618    ///   the refusal.
619    #[cfg(feature = "automatic-room-key-forwarding")]
620    async fn should_share_key(
621        &self,
622        device: &Device,
623        session: &InboundGroupSession,
624    ) -> Result<Option<u32>, super::KeyForwardDecision> {
625        use super::KeyForwardDecision;
626        use crate::olm::ShareState;
627
628        let outbound_session = self
629            .inner
630            .outbound_group_sessions
631            .get_or_load(session.room_id())
632            .await
633            .filter(|outgoing_session| outgoing_session.session_id() == session.session_id());
634
635        // If this is our own, verified device, we share the entire session from the
636        // earliest known index.
637        if device.user_id() == self.user_id() && device.is_verified() {
638            Ok(None)
639        // Otherwise, if the records show we previously shared with this device,
640        // we'll reshare the session from the index we previously shared
641        // at. For this, we need an outbound session because this
642        // information is recorded there.
643        } else if let Some(outbound) = outbound_session {
644            match outbound.is_shared_with(&device.inner) {
645                ShareState::Shared { message_index, olm_wedging_index: _ } => {
646                    Ok(Some(message_index))
647                }
648                ShareState::SharedButChangedSenderKey => Err(KeyForwardDecision::ChangedSenderKey),
649                ShareState::NotShared => Err(KeyForwardDecision::OutboundSessionNotShared),
650            }
651        // Otherwise, there's not enough info to decide if we can safely share
652        // the session.
653        } else if device.user_id() == self.user_id() {
654            Err(KeyForwardDecision::UntrustedDevice)
655        } else {
656            Err(KeyForwardDecision::MissingOutboundSession)
657        }
658    }
659
660    /// Check if it's ok, or rather if it makes sense to automatically request
661    /// a key from our other devices.
662    ///
663    /// # Arguments
664    ///
665    /// * `key_info` - The info of our key request containing information about
666    ///   the key we wish to request.
667    #[cfg(feature = "automatic-room-key-forwarding")]
668    async fn should_request_key(&self, key_info: &SecretInfo) -> Result<bool, CryptoStoreError> {
669        if self.inner.room_key_requests_enabled.load(Ordering::SeqCst) {
670            let request = self.inner.store.get_secret_request_by_info(key_info).await?;
671
672            // Don't send out duplicate requests, users can re-request them if they
673            // think a second request might succeed.
674            if request.is_none() {
675                let devices = self.inner.store.get_user_devices(self.user_id()).await?;
676
677                // Devices will only respond to key requests if the devices are
678                // verified, if the device isn't verified by us it's unlikely that
679                // we're verified by them either. Don't request keys if there isn't
680                // at least one verified device.
681                Ok(devices.is_any_verified())
682            } else {
683                Ok(false)
684            }
685        } else {
686            Ok(false)
687        }
688    }
689
690    /// Create a new outgoing key request for the key with the given session id.
691    ///
692    /// This will queue up a new to-device request and store the key info so
693    /// once we receive a forwarded room key we can check that it matches the
694    /// key we requested.
695    ///
696    /// This method will return a cancel request and a new key request if the
697    /// key was already requested, otherwise it will return just the key
698    /// request.
699    ///
700    /// # Arguments
701    ///
702    /// * `room_id` - The id of the room where the key is used in.
703    ///
704    /// * `event` - The event for which we would like to request the room key.
705    pub async fn request_key(
706        &self,
707        room_id: &RoomId,
708        event: &EncryptedEvent,
709    ) -> Result<(Option<OutgoingRequest>, OutgoingRequest), MegolmError> {
710        let secret_info =
711            event.room_key_info(room_id).ok_or(EventError::UnsupportedAlgorithm)?.into();
712
713        let request = self.inner.store.get_secret_request_by_info(&secret_info).await?;
714
715        if let Some(request) = request {
716            let cancel = request.to_cancellation(self.device_id());
717            let request = request.to_request(self.device_id());
718
719            Ok((Some(cancel), request))
720        } else {
721            let request = self.request_key_helper(secret_info).await?;
722
723            Ok((None, request))
724        }
725    }
726
727    /// Create outgoing secret requests for the given
728    pub fn request_missing_secrets(
729        own_user_id: &UserId,
730        secret_names: Vec<SecretName>,
731    ) -> Vec<GossipRequest> {
732        if !secret_names.is_empty() {
733            info!(?secret_names, "Creating new outgoing secret requests");
734
735            secret_names
736                .into_iter()
737                .map(|n| GossipRequest::from_secret_name(own_user_id.to_owned(), n))
738                .collect()
739        } else {
740            trace!("No secrets are missing from our store, not requesting them");
741            vec![]
742        }
743    }
744
745    async fn request_key_helper(
746        &self,
747        key_info: SecretInfo,
748    ) -> Result<OutgoingRequest, CryptoStoreError> {
749        let request = GossipRequest {
750            request_recipient: self.user_id().to_owned(),
751            request_id: TransactionId::new(),
752            info: key_info,
753            sent_out: false,
754        };
755
756        let outgoing_request = request.to_request(self.device_id());
757        self.save_outgoing_key_info(request).await?;
758
759        Ok(outgoing_request)
760    }
761
762    /// Create a new outgoing key request for the key with the given session id.
763    ///
764    /// This will queue up a new to-device request and store the key info so
765    /// once we receive a forwarded room key we can check that it matches the
766    /// key we requested.
767    ///
768    /// This does nothing if a request for this key has already been sent out.
769    ///
770    /// # Arguments
771    /// * `room_id` - The id of the room where the key is used in.
772    ///
773    /// * `event` - The event for which we would like to request the room key.
774    #[cfg(feature = "automatic-room-key-forwarding")]
775    pub async fn create_outgoing_key_request(
776        &self,
777        room_id: &RoomId,
778        event: &EncryptedEvent,
779    ) -> Result<bool, CryptoStoreError> {
780        if let Some(info) = event.room_key_info(room_id).map(|i| i.into()) {
781            if self.should_request_key(&info).await? {
782                // Size of the request_key_helper future should not impact this
783                // async fn since it is likely enough that this branch won't be
784                // entered.
785                Box::pin(self.request_key_helper(info)).await?;
786                return Ok(true);
787            }
788        }
789
790        Ok(false)
791    }
792
793    /// Save an outgoing key info.
794    async fn save_outgoing_key_info(&self, info: GossipRequest) -> Result<(), CryptoStoreError> {
795        let mut changes = Changes::default();
796        changes.key_requests.push(info);
797        self.inner.store.save_changes(changes).await?;
798
799        Ok(())
800    }
801
802    /// Delete the given outgoing key info.
803    async fn delete_key_info(&self, info: &GossipRequest) -> Result<(), CryptoStoreError> {
804        self.inner.store.delete_outgoing_secret_requests(&info.request_id).await
805    }
806
807    /// Mark the outgoing request as sent.
808    pub async fn mark_outgoing_request_as_sent(
809        &self,
810        id: &TransactionId,
811    ) -> Result<(), CryptoStoreError> {
812        let info = self.inner.store.get_outgoing_secret_requests(id).await?;
813
814        if let Some(mut info) = info {
815            trace!(
816                recipient = ?info.request_recipient,
817                request_type = info.request_type(),
818                request_id = ?info.request_id,
819                "Marking outgoing secret request as sent"
820            );
821            info.sent_out = true;
822            self.save_outgoing_key_info(info).await?;
823        }
824
825        self.inner.outgoing_requests.write().remove(id);
826
827        Ok(())
828    }
829
830    /// Mark the given outgoing key info as done.
831    ///
832    /// This will queue up a request cancellation.
833    async fn mark_as_done(&self, key_info: &GossipRequest) -> Result<(), CryptoStoreError> {
834        trace!(
835            recipient = ?key_info.request_recipient,
836            request_type = key_info.request_type(),
837            request_id = ?key_info.request_id,
838            "Successfully received a secret, removing the request"
839        );
840
841        self.inner.outgoing_requests.write().remove(&key_info.request_id);
842        // TODO return the key info instead of deleting it so the sync handler
843        // can delete it in one transaction.
844        self.delete_key_info(key_info).await?;
845
846        let request = key_info.to_cancellation(self.device_id());
847        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
848
849        Ok(())
850    }
851
852    async fn accept_secret(
853        &self,
854        secret: GossippedSecret,
855        changes: &mut Changes,
856    ) -> Result<(), CryptoStoreError> {
857        if secret.secret_name != SecretName::RecoveryKey {
858            match self.inner.store.import_secret(&secret).await {
859                Ok(_) => self.mark_as_done(&secret.gossip_request).await?,
860                // If this is a store error propagate it up the call stack.
861                Err(SecretImportError::Store(e)) => return Err(e),
862                // Otherwise warn that there was something wrong with the
863                // secret.
864                Err(e) => {
865                    warn!(
866                        secret_name = ?secret.secret_name,
867                        error = ?e,
868                        "Error while importing a secret"
869                    );
870                }
871            }
872        } else {
873            // We would need to fire out a request to figure out if this backup decryption
874            // key is the one that is used for the current backup and if the
875            // backup is trusted.
876            //
877            // So we put the secret into our inbox. Later users can inspect the contents of
878            // the inbox and decide if they want to activate the backup.
879            info!("Received a backup decryption key, storing it into the secret inbox.");
880            changes.secrets.push(secret);
881        }
882
883        Ok(())
884    }
885
886    async fn receive_secret(
887        &self,
888        cache: &StoreCache,
889        sender_key: Curve25519PublicKey,
890        secret: GossippedSecret,
891        changes: &mut Changes,
892    ) -> Result<(), CryptoStoreError> {
893        debug!("Received a m.secret.send event with a matching request");
894
895        if let Some(device) =
896            self.inner.store.get_device_from_curve_key(&secret.event.sender, sender_key).await?
897        {
898            // Only accept secrets from one of our own trusted devices.
899            if device.user_id() == self.user_id() && device.is_verified() {
900                self.accept_secret(secret, changes).await?;
901            } else {
902                warn!("Received a m.secret.send event from another user or from unverified device");
903            }
904        } else {
905            warn!("Received a m.secret.send event from an unknown device");
906
907            self.identity_manager()
908                .key_query_manager
909                .synced(cache)
910                .await?
911                .mark_user_as_changed(&secret.event.sender)
912                .await?;
913        }
914
915        Ok(())
916    }
917
918    #[instrument(skip_all, fields(sender_key, sender = ?event.sender, request_id = ?event.content.request_id, secret_name))]
919    pub async fn receive_secret_event(
920        &self,
921        cache: &StoreCache,
922        sender_key: Curve25519PublicKey,
923        event: &DecryptedSecretSendEvent,
924        changes: &mut Changes,
925    ) -> Result<Option<SecretName>, CryptoStoreError> {
926        debug!("Received a m.secret.send event");
927
928        let request_id = &event.content.request_id;
929
930        let name = if let Some(request) =
931            self.inner.store.get_outgoing_secret_requests(request_id).await?
932        {
933            match &request.info {
934                SecretInfo::KeyRequest(_) => {
935                    warn!("Received a m.secret.send event but the request was for a room key");
936
937                    None
938                }
939                SecretInfo::SecretRequest(secret_name) => {
940                    Span::current().record("secret_name", debug(secret_name));
941
942                    let secret_name = secret_name.to_owned();
943
944                    let secret = GossippedSecret {
945                        secret_name: secret_name.to_owned(),
946                        event: event.to_owned(),
947                        gossip_request: request,
948                    };
949
950                    self.receive_secret(cache, sender_key, secret, changes).await?;
951
952                    Some(secret_name)
953                }
954            }
955        } else {
956            warn!("Received a m.secret.send event, but no matching request was found");
957            None
958        };
959
960        Ok(name)
961    }
962
963    async fn accept_forwarded_room_key(
964        &self,
965        info: &GossipRequest,
966        sender_key: Curve25519PublicKey,
967        event: &DecryptedForwardedRoomKeyEvent,
968    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
969        match InboundGroupSession::try_from(event) {
970            Ok(session) => {
971                if self.inner.store.compare_group_session(&session).await?
972                    == SessionOrdering::Better
973                {
974                    self.mark_as_done(info).await?;
975
976                    info!(
977                        ?sender_key,
978                        claimed_sender_key = ?session.sender_key(),
979                        room_id = ?session.room_id(),
980                        session_id = session.session_id(),
981                        algorithm = ?session.algorithm(),
982                        "Received a forwarded room key",
983                    );
984
985                    Ok(Some(session))
986                } else {
987                    info!(
988                        ?sender_key,
989                        claimed_sender_key = ?session.sender_key(),
990                        room_id = ?session.room_id(),
991                        session_id = session.session_id(),
992                        algorithm = ?session.algorithm(),
993                        "Received a forwarded room key but we already have a better version of it",
994                    );
995
996                    Ok(None)
997                }
998            }
999            Err(e) => {
1000                warn!(?sender_key, "Couldn't create a group session from a received room key");
1001                Err(e.into())
1002            }
1003        }
1004    }
1005
1006    async fn should_accept_forward(
1007        &self,
1008        info: &GossipRequest,
1009        sender_key: Curve25519PublicKey,
1010    ) -> Result<bool, CryptoStoreError> {
1011        let device =
1012            self.inner.store.get_device_from_curve_key(&info.request_recipient, sender_key).await?;
1013
1014        if let Some(device) = device {
1015            Ok(device.user_id() == self.user_id() && device.is_verified())
1016        } else {
1017            Ok(false)
1018        }
1019    }
1020
1021    /// Receive a forwarded room key event that was sent using any of our
1022    /// supported content types.
1023    async fn receive_supported_keys(
1024        &self,
1025        sender_key: Curve25519PublicKey,
1026        event: &DecryptedForwardedRoomKeyEvent,
1027    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1028        let Some(info) = event.room_key_info() else {
1029            warn!(
1030                sender_key = sender_key.to_base64(),
1031                algorithm = ?event.content.algorithm(),
1032                "Received a forwarded room key with an unsupported algorithm",
1033            );
1034            return Ok(None);
1035        };
1036
1037        let Some(request) =
1038            self.inner.store.get_secret_request_by_info(&info.clone().into()).await?
1039        else {
1040            warn!(
1041                sender_key = ?sender_key,
1042                room_id = ?info.room_id(),
1043                session_id = info.session_id(),
1044                sender_key = ?sender_key,
1045                algorithm = ?info.algorithm(),
1046                "Received a forwarded room key that we didn't request",
1047            );
1048            return Ok(None);
1049        };
1050
1051        if self.should_accept_forward(&request, sender_key).await? {
1052            self.accept_forwarded_room_key(&request, sender_key, event).await
1053        } else {
1054            warn!(
1055                ?sender_key,
1056                room_id = ?info.room_id(),
1057                session_id = info.session_id(),
1058                "Received a forwarded room key from an unknown device, or \
1059                 from a device that the key request recipient doesn't own",
1060            );
1061
1062            Ok(None)
1063        }
1064    }
1065
1066    /// Receive a forwarded room key event.
1067    pub async fn receive_forwarded_room_key(
1068        &self,
1069        sender_key: Curve25519PublicKey,
1070        event: &DecryptedForwardedRoomKeyEvent,
1071    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1072        match event.content {
1073            ForwardedRoomKeyContent::MegolmV1AesSha2(_) => {
1074                self.receive_supported_keys(sender_key, event).await
1075            }
1076            #[cfg(feature = "experimental-algorithms")]
1077            ForwardedRoomKeyContent::MegolmV2AesSha2(_) => {
1078                self.receive_supported_keys(sender_key, event).await
1079            }
1080            ForwardedRoomKeyContent::Unknown(_) => {
1081                warn!(
1082                    sender = event.sender.as_str(),
1083                    sender_key = sender_key.to_base64(),
1084                    algorithm = ?event.content.algorithm(),
1085                    "Received a forwarded room key with an unsupported algorithm",
1086                );
1087
1088                Ok(None)
1089            }
1090        }
1091    }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096    use std::sync::Arc;
1097
1098    #[cfg(feature = "automatic-room-key-forwarding")]
1099    use assert_matches::assert_matches;
1100    use matrix_sdk_test::{async_test, message_like_event_content};
1101    use ruma::{
1102        device_id, event_id,
1103        events::{
1104            secret::request::{RequestAction, SecretName, ToDeviceSecretRequestEventContent},
1105            ToDeviceEvent as RumaToDeviceEvent,
1106        },
1107        room_id,
1108        serde::Raw,
1109        user_id, DeviceId, RoomId, UserId,
1110    };
1111    use tokio::sync::Mutex;
1112
1113    use super::GossipMachine;
1114    #[cfg(feature = "automatic-room-key-forwarding")]
1115    use crate::{
1116        gossiping::KeyForwardDecision,
1117        olm::OutboundGroupSession,
1118        store::{CryptoStore, DeviceChanges},
1119        types::requests::AnyOutgoingRequest,
1120        types::{
1121            events::{
1122                forwarded_room_key::ForwardedRoomKeyContent, olm_v1::AnyDecryptedOlmEvent,
1123                olm_v1::DecryptedOlmV1Event,
1124            },
1125            EventEncryptionAlgorithm,
1126        },
1127        EncryptionSettings,
1128    };
1129    use crate::{
1130        identities::{DeviceData, IdentityManager, LocalTrust},
1131        olm::{Account, PrivateCrossSigningIdentity},
1132        session_manager::GroupSessionCache,
1133        store::{Changes, CryptoStoreWrapper, MemoryStore, PendingChanges, Store},
1134        types::events::room::encrypted::{
1135            EncryptedEvent, EncryptedToDeviceEvent, RoomEncryptedEventContent,
1136        },
1137        verification::VerificationMachine,
1138    };
1139
1140    fn alice_id() -> &'static UserId {
1141        user_id!("@alice:example.org")
1142    }
1143
1144    fn alice_device_id() -> &'static DeviceId {
1145        device_id!("JLAFKJWSCS")
1146    }
1147
1148    fn bob_id() -> &'static UserId {
1149        user_id!("@bob:example.org")
1150    }
1151
1152    fn bob_device_id() -> &'static DeviceId {
1153        device_id!("ILMLKASTES")
1154    }
1155
1156    fn alice2_device_id() -> &'static DeviceId {
1157        device_id!("ILMLKASTES")
1158    }
1159
1160    fn room_id() -> &'static RoomId {
1161        room_id!("!test:example.org")
1162    }
1163
1164    fn account() -> Account {
1165        Account::with_device_id(alice_id(), alice_device_id())
1166    }
1167
1168    fn bob_account() -> Account {
1169        Account::with_device_id(bob_id(), bob_device_id())
1170    }
1171
1172    fn alice_2_account() -> Account {
1173        Account::with_device_id(alice_id(), alice2_device_id())
1174    }
1175
1176    #[cfg(feature = "automatic-room-key-forwarding")]
1177    async fn gossip_machine_test_helper(user_id: &UserId) -> GossipMachine {
1178        let user_id = user_id.to_owned();
1179        let device_id = DeviceId::new();
1180
1181        let store = Arc::new(store_with_account_helper(&user_id, &device_id).await);
1182        let static_data = store.load_account().await.unwrap().unwrap().static_data;
1183        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1184        let verification =
1185            VerificationMachine::new(static_data.clone(), identity.clone(), store.clone());
1186        let store = Store::new(static_data, identity, store, verification);
1187
1188        let session_cache = GroupSessionCache::new(store.clone());
1189        let identity_manager = IdentityManager::new(store.clone());
1190
1191        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1192    }
1193
1194    #[cfg(feature = "automatic-room-key-forwarding")]
1195    async fn store_with_account_helper(
1196        user_id: &UserId,
1197        device_id: &DeviceId,
1198    ) -> CryptoStoreWrapper {
1199        // Properly create the store by first saving the own device and then the account
1200        // data.
1201        let account = Account::with_device_id(user_id, device_id);
1202        let device = DeviceData::from_account(&account);
1203        device.set_trust_state(LocalTrust::Verified);
1204
1205        let changes = Changes {
1206            devices: DeviceChanges { new: vec![device], ..Default::default() },
1207            ..Default::default()
1208        };
1209        let mem_store = MemoryStore::new();
1210        mem_store.save_changes(changes).await.unwrap();
1211        mem_store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1212
1213        CryptoStoreWrapper::new(user_id, device_id, mem_store)
1214    }
1215
1216    async fn get_machine_test_helper() -> GossipMachine {
1217        let user_id = alice_id().to_owned();
1218        let account = Account::with_device_id(&user_id, alice_device_id());
1219        let device = DeviceData::from_account(&account);
1220        let another_device =
1221            DeviceData::from_account(&Account::with_device_id(&user_id, alice2_device_id()));
1222
1223        let store =
1224            Arc::new(CryptoStoreWrapper::new(&user_id, account.device_id(), MemoryStore::new()));
1225        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1226        let verification =
1227            VerificationMachine::new(account.static_data.clone(), identity.clone(), store.clone());
1228
1229        let store = Store::new(account.static_data().clone(), identity, store, verification);
1230        store.save_device_data(&[device, another_device]).await.unwrap();
1231        store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1232        let session_cache = GroupSessionCache::new(store.clone());
1233
1234        let identity_manager = IdentityManager::new(store.clone());
1235
1236        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1237    }
1238
1239    #[cfg(feature = "automatic-room-key-forwarding")]
1240    async fn machines_for_key_share_test_helper(
1241        other_machine_owner: &UserId,
1242        create_sessions: bool,
1243        algorithm: EventEncryptionAlgorithm,
1244    ) -> (GossipMachine, OutboundGroupSession, GossipMachine) {
1245        use crate::olm::SenderData;
1246
1247        let alice_machine = get_machine_test_helper().await;
1248        let alice_device = DeviceData::from_account(
1249            &alice_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1250        );
1251
1252        let bob_machine = gossip_machine_test_helper(other_machine_owner).await;
1253
1254        let bob_device = DeviceData::from_account(
1255            #[allow(clippy::explicit_auto_deref)] // clippy's wrong
1256            &*bob_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1257        );
1258
1259        // We need a trusted device, otherwise we won't request keys
1260        let second_device = DeviceData::from_account(&alice_2_account());
1261        second_device.set_trust_state(LocalTrust::Verified);
1262        bob_device.set_trust_state(LocalTrust::Verified);
1263        alice_machine.inner.store.save_device_data(&[bob_device, second_device]).await.unwrap();
1264        bob_machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
1265
1266        if create_sessions {
1267            // Create Olm sessions for our two accounts.
1268            let (alice_session, bob_session) = alice_machine
1269                .inner
1270                .store
1271                .with_transaction(|mut atr| async {
1272                    let sessions = bob_machine
1273                        .inner
1274                        .store
1275                        .with_transaction(|mut btr| async {
1276                            let alice_account = atr.account().await?;
1277                            let bob_account = btr.account().await?;
1278                            let sessions =
1279                                alice_account.create_session_for_test_helper(bob_account).await;
1280                            Ok((btr, sessions))
1281                        })
1282                        .await?;
1283                    Ok((atr, sessions))
1284                })
1285                .await
1286                .unwrap();
1287
1288            // Populate our stores with Olm sessions and a Megolm session.
1289
1290            alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
1291            bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
1292        }
1293
1294        let settings = EncryptionSettings { algorithm, ..Default::default() };
1295        let (group_session, inbound_group_session) = bob_machine
1296            .inner
1297            .store
1298            .static_account()
1299            .create_group_session_pair(room_id(), settings, SenderData::unknown())
1300            .await
1301            .unwrap();
1302
1303        bob_machine
1304            .inner
1305            .store
1306            .save_inbound_group_sessions(&[inbound_group_session])
1307            .await
1308            .unwrap();
1309
1310        let content = group_session.encrypt("m.dummy", &message_like_event_content!({})).await;
1311        let event = wrap_encrypted_content(bob_machine.user_id(), content);
1312
1313        // Alice wants to request the outbound group session from bob.
1314        assert!(
1315            alice_machine.create_outgoing_key_request(room_id(), &event,).await.unwrap(),
1316            "We should request a room key"
1317        );
1318
1319        group_session
1320            .mark_shared_with(
1321                alice_device.user_id(),
1322                alice_device.device_id(),
1323                alice_device.curve25519_key().unwrap(),
1324            )
1325            .await;
1326
1327        // Put the outbound session into bobs store.
1328        bob_machine.inner.outbound_group_sessions.insert(group_session.clone());
1329
1330        (alice_machine, group_session, bob_machine)
1331    }
1332
1333    fn extract_content<'a>(
1334        recipient: &UserId,
1335        request: &'a crate::types::requests::OutgoingRequest,
1336    ) -> &'a Raw<ruma::events::AnyToDeviceEventContent> {
1337        request
1338            .request()
1339            .to_device()
1340            .expect("The request should be always a to-device request")
1341            .messages
1342            .get(recipient)
1343            .unwrap()
1344            .values()
1345            .next()
1346            .unwrap()
1347    }
1348
1349    fn wrap_encrypted_content(
1350        sender: &UserId,
1351        content: Raw<RoomEncryptedEventContent>,
1352    ) -> EncryptedEvent {
1353        let content = content.deserialize().unwrap();
1354
1355        EncryptedEvent {
1356            sender: sender.to_owned(),
1357            event_id: event_id!("$143273582443PhrSn:example.org").to_owned(),
1358            content,
1359            origin_server_ts: ruma::MilliSecondsSinceUnixEpoch::now(),
1360            unsigned: Default::default(),
1361            other: Default::default(),
1362        }
1363    }
1364
1365    fn request_to_event<C>(
1366        recipient: &UserId,
1367        sender: &UserId,
1368        request: &crate::types::requests::OutgoingRequest,
1369    ) -> crate::types::events::ToDeviceEvent<C>
1370    where
1371        C: crate::types::events::EventType
1372            + serde::de::DeserializeOwned
1373            + serde::ser::Serialize
1374            + std::fmt::Debug,
1375    {
1376        let content = extract_content(recipient, request);
1377        let content: C = content.deserialize_as().unwrap_or_else(|_| {
1378            panic!("We can always deserialize the to-device event content {content:?}")
1379        });
1380
1381        crate::types::events::ToDeviceEvent::new(sender.to_owned(), content)
1382    }
1383
1384    #[async_test]
1385    async fn test_create_machine() {
1386        let machine = get_machine_test_helper().await;
1387
1388        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1389    }
1390
1391    #[async_test]
1392    async fn test_re_request_keys() {
1393        let machine = get_machine_test_helper().await;
1394        let account = account();
1395
1396        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1397
1398        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1399        let event = wrap_encrypted_content(machine.user_id(), content);
1400
1401        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1402        let (cancel, request) = machine.request_key(session.room_id(), &event).await.unwrap();
1403
1404        assert!(cancel.is_none());
1405
1406        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1407
1408        let (cancel, _) = machine.request_key(session.room_id(), &event).await.unwrap();
1409
1410        assert!(cancel.is_some());
1411    }
1412
1413    #[async_test]
1414    #[cfg(feature = "automatic-room-key-forwarding")]
1415    async fn test_create_key_request() {
1416        let machine = get_machine_test_helper().await;
1417        let account = account();
1418        let second_account = alice_2_account();
1419        let alice_device = DeviceData::from_account(&second_account);
1420
1421        // We need a trusted device, otherwise we won't request keys
1422        alice_device.set_trust_state(LocalTrust::Verified);
1423        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1424
1425        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1426        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1427        let event = wrap_encrypted_content(machine.user_id(), content);
1428
1429        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1430        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1431        assert!(!machine.outgoing_to_device_requests().await.unwrap().is_empty());
1432        assert_eq!(machine.outgoing_to_device_requests().await.unwrap().len(), 1);
1433
1434        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1435
1436        let requests = machine.outgoing_to_device_requests().await.unwrap();
1437        assert_eq!(requests.len(), 1);
1438
1439        let request = &requests[0];
1440
1441        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1442        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1443    }
1444
1445    /// We should *not* request keys if that has been disabled
1446    #[async_test]
1447    #[cfg(feature = "automatic-room-key-forwarding")]
1448    async fn test_create_key_request_requests_disabled() {
1449        let machine = get_machine_test_helper().await;
1450        let account = account();
1451        let second_account = alice_2_account();
1452        let alice_device = DeviceData::from_account(&second_account);
1453
1454        // We need a trusted device, otherwise we won't request keys
1455        alice_device.set_trust_state(LocalTrust::Verified);
1456        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1457
1458        // Disable key requests
1459        assert!(machine.are_room_key_requests_enabled());
1460        machine.set_room_key_requests_enabled(false);
1461        assert!(!machine.are_room_key_requests_enabled());
1462
1463        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1464        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1465        let event = wrap_encrypted_content(machine.user_id(), content);
1466
1467        // The outgoing to-device requests should be empty before and after
1468        // `create_outgoing_key_request`.
1469        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1470        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1471        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1472    }
1473
1474    #[async_test]
1475    #[cfg(feature = "automatic-room-key-forwarding")]
1476    async fn test_receive_forwarded_key() {
1477        let machine = get_machine_test_helper().await;
1478        let account = account();
1479
1480        let second_account = alice_2_account();
1481        let alice_device = DeviceData::from_account(&second_account);
1482
1483        // We need a trusted device, otherwise we won't request keys
1484        alice_device.set_trust_state(LocalTrust::Verified);
1485        machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
1486
1487        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1488        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1489        let room_event = wrap_encrypted_content(machine.user_id(), content);
1490
1491        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1492
1493        let requests = machine.outgoing_to_device_requests().await.unwrap();
1494        let request = &requests[0];
1495        let id = &request.request_id;
1496
1497        machine.mark_outgoing_request_as_sent(id).await.unwrap();
1498
1499        let export = session.export_at_index(10).await;
1500
1501        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1502
1503        let event = DecryptedOlmV1Event::new(
1504            alice_id(),
1505            alice_id(),
1506            alice_device.ed25519_key().unwrap(),
1507            None,
1508            content,
1509        );
1510
1511        assert!(machine
1512            .inner
1513            .store
1514            .get_inbound_group_session(session.room_id(), session.session_id(),)
1515            .await
1516            .unwrap()
1517            .is_none());
1518
1519        let first_session = machine
1520            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1521            .await
1522            .unwrap();
1523        let first_session = first_session.unwrap();
1524
1525        assert_eq!(first_session.first_known_index(), 10);
1526
1527        machine.inner.store.save_inbound_group_sessions(&[first_session.clone()]).await.unwrap();
1528
1529        // Get the cancel request.
1530        let id = machine
1531            .inner
1532            .outgoing_requests
1533            .read()
1534            .first_key_value()
1535            .map(|(_, r)| r.request_id.clone())
1536            .unwrap();
1537        machine.mark_outgoing_request_as_sent(&id).await.unwrap();
1538
1539        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1540
1541        let requests = machine.outgoing_to_device_requests().await.unwrap();
1542        let request = &requests[0];
1543
1544        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1545
1546        let export = session.export_at_index(15).await;
1547
1548        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1549
1550        let event = DecryptedOlmV1Event::new(
1551            alice_id(),
1552            alice_id(),
1553            alice_device.ed25519_key().unwrap(),
1554            None,
1555            content,
1556        );
1557
1558        let second_session = machine
1559            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1560            .await
1561            .unwrap();
1562
1563        assert!(second_session.is_none());
1564
1565        let export = session.export_at_index(0).await;
1566
1567        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1568
1569        let event = DecryptedOlmV1Event::new(
1570            alice_id(),
1571            alice_id(),
1572            alice_device.ed25519_key().unwrap(),
1573            None,
1574            content,
1575        );
1576
1577        let second_session = machine
1578            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1579            .await
1580            .unwrap();
1581
1582        assert_eq!(second_session.unwrap().first_known_index(), 0);
1583    }
1584
1585    #[async_test]
1586    #[cfg(feature = "automatic-room-key-forwarding")]
1587    async fn test_should_share_key() {
1588        let machine = get_machine_test_helper().await;
1589        let account = account();
1590
1591        let own_device =
1592            machine.inner.store.get_device(alice_id(), alice2_device_id()).await.unwrap().unwrap();
1593
1594        let (outbound, inbound) = account.create_group_session_pair_with_defaults(room_id()).await;
1595
1596        // We don't share keys with untrusted devices.
1597        assert_matches!(
1598            machine.should_share_key(&own_device, &inbound).await,
1599            Err(KeyForwardDecision::UntrustedDevice)
1600        );
1601        own_device.set_trust_state(LocalTrust::Verified);
1602        // Now we do want to share the keys.
1603        machine.should_share_key(&own_device, &inbound).await.unwrap();
1604
1605        let bob_device = DeviceData::from_account(&bob_account());
1606        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1607
1608        let bob_device =
1609            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1610
1611        // We don't share sessions with other user's devices if no outbound
1612        // session was provided.
1613        assert_matches!(
1614            machine.should_share_key(&bob_device, &inbound).await,
1615            Err(KeyForwardDecision::MissingOutboundSession)
1616        );
1617
1618        let mut changes = Changes::default();
1619
1620        changes.outbound_group_sessions.push(outbound.clone());
1621        changes.inbound_group_sessions.push(inbound.clone());
1622        machine.inner.store.save_changes(changes).await.unwrap();
1623        machine.inner.outbound_group_sessions.insert(outbound.clone());
1624
1625        // We don't share sessions with other user's devices if the session
1626        // wasn't shared in the first place.
1627        assert_matches!(
1628            machine.should_share_key(&bob_device, &inbound).await,
1629            Err(KeyForwardDecision::OutboundSessionNotShared)
1630        );
1631
1632        bob_device.set_trust_state(LocalTrust::Verified);
1633
1634        // We don't share sessions with other user's devices if the session
1635        // wasn't shared in the first place even if the device is trusted.
1636        assert_matches!(
1637            machine.should_share_key(&bob_device, &inbound).await,
1638            Err(KeyForwardDecision::OutboundSessionNotShared)
1639        );
1640
1641        // We now share the session, since it was shared before.
1642        outbound
1643            .mark_shared_with(
1644                bob_device.user_id(),
1645                bob_device.device_id(),
1646                bob_device.curve25519_key().unwrap(),
1647            )
1648            .await;
1649        machine.should_share_key(&bob_device, &inbound).await.unwrap();
1650
1651        let (other_outbound, other_inbound) =
1652            account.create_group_session_pair_with_defaults(room_id()).await;
1653
1654        // But we don't share some other session that doesn't match our outbound
1655        // session.
1656        assert_matches!(
1657            machine.should_share_key(&bob_device, &other_inbound).await,
1658            Err(KeyForwardDecision::MissingOutboundSession)
1659        );
1660
1661        // Finally, let's ensure we don't share the session with a device that rotated
1662        // its curve25519 key.
1663        let bob_device = DeviceData::from_account(&bob_account());
1664        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1665
1666        let bob_device =
1667            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1668        assert_matches!(
1669            machine.should_share_key(&bob_device, &inbound).await,
1670            Err(KeyForwardDecision::ChangedSenderKey)
1671        );
1672
1673        // Now let's encrypt some messages in another session to increment the message
1674        // index and then share it with our own untrusted device.
1675        own_device.set_trust_state(LocalTrust::Unset);
1676
1677        for _ in 1..=3 {
1678            other_outbound.encrypt_helper("foo".to_owned()).await;
1679        }
1680        other_outbound
1681            .mark_shared_with(
1682                own_device.user_id(),
1683                own_device.device_id(),
1684                own_device.curve25519_key().unwrap(),
1685            )
1686            .await;
1687
1688        machine.inner.outbound_group_sessions.insert(other_outbound.clone());
1689
1690        // Since our device is untrusted, we should share the session starting only from
1691        // the current index (at which the message was marked as shared). This
1692        // should be 3 since we encrypted 3 messages.
1693        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(Some(3)));
1694
1695        own_device.set_trust_state(LocalTrust::Verified);
1696
1697        // However once our device is trusted, we share the entire session.
1698        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(None));
1699    }
1700
1701    #[cfg(feature = "automatic-room-key-forwarding")]
1702    async fn test_key_share_cycle(algorithm: EventEncryptionAlgorithm) {
1703        let (alice_machine, group_session, bob_machine) =
1704            machines_for_key_share_test_helper(alice_id(), true, algorithm).await;
1705
1706        // Get the request and convert it into a event.
1707        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1708        let request = &requests[0];
1709        let event = request_to_event(alice_id(), alice_id(), request);
1710
1711        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1712
1713        // Bob doesn't have any outgoing requests.
1714        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1715
1716        // Receive the room key request from alice.
1717        bob_machine.receive_incoming_key_request(&event);
1718
1719        {
1720            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1721            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1722        }
1723        // Now bob does have an outgoing request.
1724        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1725
1726        // Get the request and convert it to a encrypted to-device event.
1727        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1728        let request = &requests[0];
1729
1730        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
1731        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1732
1733        // Check that alice doesn't have the session.
1734        assert!(alice_machine
1735            .inner
1736            .store
1737            .get_inbound_group_session(room_id(), group_session.session_id())
1738            .await
1739            .unwrap()
1740            .is_none());
1741
1742        let decrypted = alice_machine
1743            .inner
1744            .store
1745            .with_transaction(|mut tr| async {
1746                let res = tr
1747                    .account()
1748                    .await?
1749                    .decrypt_to_device_event(&alice_machine.inner.store, &event)
1750                    .await?;
1751                Ok((tr, res))
1752            })
1753            .await
1754            .unwrap();
1755
1756        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1757            panic!("Invalid decrypted event type");
1758        };
1759
1760        let session = alice_machine
1761            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1762            .await
1763            .unwrap();
1764        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
1765
1766        // Check that alice now does have the session.
1767        let session = alice_machine
1768            .inner
1769            .store
1770            .get_inbound_group_session(room_id(), group_session.session_id())
1771            .await
1772            .unwrap()
1773            .unwrap();
1774
1775        assert_eq!(session.session_id(), group_session.session_id())
1776    }
1777
1778    #[async_test]
1779    #[cfg(feature = "automatic-room-key-forwarding")]
1780    async fn test_reject_forward_from_another_user() {
1781        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
1782            bob_id(),
1783            true,
1784            EventEncryptionAlgorithm::MegolmV1AesSha2,
1785        )
1786        .await;
1787
1788        // Get the request and convert it into a event.
1789        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1790        let request = &requests[0];
1791        let event = request_to_event(alice_id(), alice_id(), request);
1792
1793        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1794
1795        // Bob doesn't have any outgoing requests.
1796        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1797
1798        // Receive the room key request from alice.
1799        bob_machine.receive_incoming_key_request(&event);
1800        {
1801            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1802            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1803        }
1804        // Now bob does have an outgoing request.
1805        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1806
1807        // Get the request and convert it to a encrypted to-device event.
1808        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1809        let request = &requests[0];
1810
1811        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), bob_id(), request);
1812        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1813
1814        // Check that alice doesn't have the session.
1815        assert!(alice_machine
1816            .inner
1817            .store
1818            .get_inbound_group_session(room_id(), group_session.session_id())
1819            .await
1820            .unwrap()
1821            .is_none());
1822
1823        let decrypted = alice_machine
1824            .inner
1825            .store
1826            .with_transaction(|mut tr| async {
1827                let res = tr
1828                    .account()
1829                    .await?
1830                    .decrypt_to_device_event(&alice_machine.inner.store, &event)
1831                    .await?;
1832                Ok((tr, res))
1833            })
1834            .await
1835            .unwrap();
1836        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1837            panic!("Invalid decrypted event type");
1838        };
1839
1840        let session = alice_machine
1841            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1842            .await
1843            .unwrap();
1844
1845        assert!(session.is_none(), "We should not receive a room key from another user");
1846    }
1847
1848    #[async_test]
1849    #[cfg(feature = "automatic-room-key-forwarding")]
1850    async fn test_key_share_cycle_megolm_v1() {
1851        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV1AesSha2).await;
1852    }
1853
1854    #[async_test]
1855    #[cfg(all(feature = "experimental-algorithms", feature = "automatic-room-key-forwarding"))]
1856    async fn test_key_share_cycle_megolm_v2() {
1857        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV2AesSha2).await;
1858    }
1859
1860    #[async_test]
1861    async fn test_secret_share_cycle() {
1862        let alice_machine = get_machine_test_helper().await;
1863
1864        let mut second_account = alice_2_account();
1865        let alice_device = DeviceData::from_account(&second_account);
1866
1867        let bob_account = bob_account();
1868        let bob_device = DeviceData::from_account(&bob_account);
1869
1870        alice_machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
1871
1872        // Create Olm sessions for our two accounts.
1873        let alice_session = alice_machine
1874            .inner
1875            .store
1876            .with_transaction(|mut tr| async {
1877                let alice_account = tr.account().await?;
1878                let (alice_session, _) =
1879                    alice_account.create_session_for_test_helper(&mut second_account).await;
1880                Ok((tr, alice_session))
1881            })
1882            .await
1883            .unwrap();
1884
1885        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
1886
1887        let event = RumaToDeviceEvent {
1888            sender: bob_account.user_id().to_owned(),
1889            content: ToDeviceSecretRequestEventContent::new(
1890                RequestAction::Request(SecretName::CrossSigningMasterKey),
1891                bob_account.device_id().to_owned(),
1892                "request_id".into(),
1893            ),
1894        };
1895
1896        // No secret found
1897        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1898        alice_machine.receive_incoming_secret_request(&event);
1899        {
1900            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1901            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1902        }
1903        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1904
1905        // No device found
1906        alice_machine.inner.store.reset_cross_signing_identity().await;
1907        alice_machine.receive_incoming_secret_request(&event);
1908        {
1909            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1910            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1911        }
1912        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1913
1914        alice_machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1915
1916        // The device doesn't belong to us
1917        alice_machine.inner.store.reset_cross_signing_identity().await;
1918        alice_machine.receive_incoming_secret_request(&event);
1919        {
1920            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1921            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1922        }
1923        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1924
1925        let event = RumaToDeviceEvent {
1926            sender: alice_id().to_owned(),
1927            content: ToDeviceSecretRequestEventContent::new(
1928                RequestAction::Request(SecretName::CrossSigningMasterKey),
1929                second_account.device_id().into(),
1930                "request_id".into(),
1931            ),
1932        };
1933
1934        // The device isn't trusted
1935        alice_machine.receive_incoming_secret_request(&event);
1936        {
1937            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1938            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1939        }
1940        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1941
1942        // We need a trusted device, otherwise we won't serve secrets
1943        alice_device.set_trust_state(LocalTrust::Verified);
1944        alice_machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
1945
1946        alice_machine.receive_incoming_secret_request(&event);
1947        {
1948            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1949            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1950        }
1951        assert!(!alice_machine.inner.outgoing_requests.read().is_empty());
1952    }
1953
1954    #[async_test]
1955    async fn test_secret_broadcasting() {
1956        use futures_util::{pin_mut, FutureExt};
1957        use ruma::api::client::to_device::send_event_to_device::v3::Response as ToDeviceResponse;
1958        use serde_json::value::to_raw_value;
1959        use tokio_stream::StreamExt;
1960
1961        use crate::{
1962            machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper,
1963            EncryptionSyncChanges,
1964        };
1965
1966        let alice_id = user_id!("@alice:localhost");
1967
1968        let (alice_machine, bob_machine) =
1969            get_machine_pair_with_setup_sessions_test_helper(alice_id, alice_id, false).await;
1970
1971        let key_requests = GossipMachine::request_missing_secrets(
1972            bob_machine.user_id(),
1973            vec![SecretName::RecoveryKey],
1974        );
1975        let mut changes = Changes::default();
1976        let request_id = key_requests[0].request_id.to_owned();
1977        changes.key_requests = key_requests;
1978        bob_machine.store().save_changes(changes).await.unwrap();
1979        for request in bob_machine.outgoing_requests().await.unwrap() {
1980            bob_machine
1981                .mark_request_as_sent(request.request_id(), &ToDeviceResponse::new())
1982                .await
1983                .unwrap();
1984        }
1985
1986        let event = RumaToDeviceEvent {
1987            sender: alice_machine.user_id().to_owned(),
1988            content: ToDeviceSecretRequestEventContent::new(
1989                RequestAction::Request(SecretName::RecoveryKey),
1990                bob_machine.device_id().to_owned(),
1991                request_id,
1992            ),
1993        };
1994
1995        let bob_device = alice_machine
1996            .get_device(alice_id, bob_machine.device_id(), None)
1997            .await
1998            .unwrap()
1999            .unwrap();
2000        let alice_device = bob_machine
2001            .get_device(alice_id, alice_machine.device_id(), None)
2002            .await
2003            .unwrap()
2004            .unwrap();
2005
2006        // We need a trusted device, otherwise we won't serve nor accept secrets.
2007        bob_device.set_trust_state(LocalTrust::Verified);
2008        alice_device.set_trust_state(LocalTrust::Verified);
2009        alice_machine.store().save_device_data(&[bob_device.inner]).await.unwrap();
2010        bob_machine.store().save_device_data(&[alice_device.inner]).await.unwrap();
2011
2012        let decryption_key = crate::store::BackupDecryptionKey::new().unwrap();
2013        alice_machine
2014            .backup_machine()
2015            .save_decryption_key(Some(decryption_key), None)
2016            .await
2017            .unwrap();
2018        alice_machine.inner.key_request_machine.receive_incoming_secret_request(&event);
2019        {
2020            let alice_cache = alice_machine.store().cache().await.unwrap();
2021            alice_machine
2022                .inner
2023                .key_request_machine
2024                .collect_incoming_key_requests(&alice_cache)
2025                .await
2026                .unwrap();
2027        }
2028
2029        let requests =
2030            alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
2031
2032        assert_eq!(requests.len(), 1);
2033        let request = requests.first().expect("We should have an outgoing to-device request");
2034
2035        let event: EncryptedToDeviceEvent =
2036            request_to_event(bob_machine.user_id(), alice_machine.user_id(), request);
2037        let event = Raw::from_json(to_raw_value(&event).unwrap());
2038
2039        let stream = bob_machine.store().secrets_stream();
2040        pin_mut!(stream);
2041
2042        bob_machine
2043            .receive_sync_changes(EncryptionSyncChanges {
2044                to_device_events: vec![event],
2045                changed_devices: &Default::default(),
2046                one_time_keys_counts: &Default::default(),
2047                unused_fallback_keys: None,
2048                next_batch_token: None,
2049            })
2050            .await
2051            .unwrap();
2052
2053        stream.next().now_or_never().expect("The broadcaster should have sent out the secret");
2054    }
2055
2056    #[async_test]
2057    #[cfg(feature = "automatic-room-key-forwarding")]
2058    async fn test_key_share_cycle_without_session() {
2059        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
2060            alice_id(),
2061            false,
2062            EventEncryptionAlgorithm::MegolmV1AesSha2,
2063        )
2064        .await;
2065
2066        // Get the request and convert it into a event.
2067        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
2068        let request = &requests[0];
2069        let event = request_to_event(alice_id(), alice_id(), request);
2070
2071        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2072
2073        // Bob doesn't have any outgoing requests.
2074        assert!(bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2075        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2076        assert!(bob_machine.inner.wait_queue.is_empty());
2077
2078        // Receive the room key request from alice.
2079        bob_machine.receive_incoming_key_request(&event);
2080        {
2081            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2082            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2083        }
2084        // Bob only has a keys claim request, since we're lacking a session
2085        assert_eq!(bob_machine.outgoing_to_device_requests().await.unwrap().len(), 1);
2086        assert_matches!(
2087            bob_machine.outgoing_to_device_requests().await.unwrap()[0].request(),
2088            AnyOutgoingRequest::KeysClaim(_)
2089        );
2090        assert!(!bob_machine.inner.users_for_key_claim.read().is_empty());
2091        assert!(!bob_machine.inner.wait_queue.is_empty());
2092
2093        let (alice_session, bob_session) = alice_machine
2094            .inner
2095            .store
2096            .with_transaction(|mut atr| async {
2097                let res = bob_machine
2098                    .inner
2099                    .store
2100                    .with_transaction(|mut btr| async {
2101                        let alice_account = atr.account().await?;
2102                        let bob_account = btr.account().await?;
2103                        let sessions =
2104                            alice_account.create_session_for_test_helper(bob_account).await;
2105                        Ok((btr, sessions))
2106                    })
2107                    .await?;
2108                Ok((atr, res))
2109            })
2110            .await
2111            .unwrap();
2112
2113        // We create a session now.
2114        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
2115        bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
2116
2117        bob_machine.retry_keyshare(alice_id(), alice_device_id());
2118        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2119        {
2120            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2121            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2122        }
2123        // Bob now has an outgoing requests.
2124        assert!(!bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2125        assert!(bob_machine.inner.wait_queue.is_empty());
2126
2127        // Get the request and convert it to a encrypted to-device event.
2128        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
2129        let request = &requests[0];
2130
2131        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
2132        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2133
2134        // Check that alice doesn't have the session.
2135        assert!(alice_machine
2136            .inner
2137            .store
2138            .get_inbound_group_session(room_id(), group_session.session_id())
2139            .await
2140            .unwrap()
2141            .is_none());
2142
2143        let decrypted = alice_machine
2144            .inner
2145            .store
2146            .with_transaction(|mut tr| async {
2147                let res = tr
2148                    .account()
2149                    .await?
2150                    .decrypt_to_device_event(&alice_machine.inner.store, &event)
2151                    .await?;
2152                Ok((tr, res))
2153            })
2154            .await
2155            .unwrap();
2156
2157        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
2158            panic!("Invalid decrypted event type");
2159        };
2160
2161        let session = alice_machine
2162            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
2163            .await
2164            .unwrap();
2165        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
2166
2167        // Check that alice now does have the session.
2168        let session = alice_machine
2169            .inner
2170            .store
2171            .get_inbound_group_session(room_id(), group_session.session_id())
2172            .await
2173            .unwrap()
2174            .unwrap();
2175
2176        assert_eq!(session.session_id(), group_session.session_id())
2177    }
2178}