matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::PathBuf,
26    str::FromStr,
27    sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33    future::try_join,
34    stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39    StateStoreDataKey, StateStoreDataValue,
40    crypto::{
41        CrossSigningBootstrapRequests, OlmMachine,
42        store::types::{RoomKeyBundleInfo, RoomKeyInfo},
43        types::{
44            SignedKey,
45            requests::{
46                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
47            },
48        },
49    },
50};
51use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
52use ruma::{
53    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
54    api::client::{
55        error::ErrorBody,
56        keys::{
57            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
58            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
59        },
60        message::send_message_event,
61        to_device::send_event_to_device::v3::{
62            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
63        },
64        uiaa::{AuthData, UiaaInfo},
65    },
66    assign,
67    events::{
68        direct::DirectUserIdentifier,
69        room::{MediaSource, ThumbnailInfo},
70    },
71};
72#[cfg(feature = "experimental-send-custom-to-device")]
73use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
74use serde::{Deserialize, de::Error as _};
75use tasks::BundleReceiverTask;
76use tokio::sync::{Mutex, RwLockReadGuard};
77use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
78use tracing::{debug, error, instrument, trace, warn};
79use url::Url;
80use vodozemac::Curve25519PublicKey;
81
82use self::{
83    backups::{Backups, types::BackupClientState},
84    futures::UploadEncryptedFile,
85    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
86    recovery::{Recovery, RecoveryState},
87    secret_storage::SecretStorage,
88    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
89    verification::{SasVerification, Verification, VerificationRequest},
90};
91use crate::{
92    Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
93    attachment::Thumbnail,
94    client::{ClientInner, WeakClient},
95    cross_process_lock::CrossProcessLockGuard,
96    error::HttpResult,
97};
98
99pub mod backups;
100pub mod futures;
101pub mod identities;
102pub mod recovery;
103pub mod secret_storage;
104pub(crate) mod tasks;
105pub mod verification;
106
107pub use matrix_sdk_base::crypto::{
108    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
109    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
110    SessionCreationError, SignatureError, VERSION,
111    olm::{
112        SessionCreationError as MegolmSessionCreationError,
113        SessionExportError as OlmSessionExportError,
114    },
115    vodozemac,
116};
117
118#[cfg(feature = "experimental-send-custom-to-device")]
119use crate::config::RequestConfig;
120pub use crate::error::RoomKeyImportError;
121
122/// All the data related to the encryption state.
123pub(crate) struct EncryptionData {
124    /// Background tasks related to encryption (key backup, initialization
125    /// tasks, etc.).
126    pub tasks: StdMutex<ClientTasks>,
127
128    /// End-to-end encryption settings.
129    pub encryption_settings: EncryptionSettings,
130
131    /// All state related to key backup.
132    pub backup_state: BackupClientState,
133
134    /// All state related to secret storage recovery.
135    pub recovery_state: SharedObservable<RecoveryState>,
136}
137
138impl EncryptionData {
139    pub fn new(encryption_settings: EncryptionSettings) -> Self {
140        Self {
141            encryption_settings,
142
143            tasks: StdMutex::new(Default::default()),
144            backup_state: Default::default(),
145            recovery_state: Default::default(),
146        }
147    }
148
149    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
150        let weak_client = WeakClient::from_inner(client);
151
152        let mut tasks = self.tasks.lock();
153        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
154
155        if self.encryption_settings.backup_download_strategy
156            == BackupDownloadStrategy::AfterDecryptionFailure
157        {
158            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
159        }
160    }
161
162    /// Initialize the background task which listens for changes in the
163    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
164    ///
165    /// This should happen after the usual tasks have been set up and after the
166    /// E2EE initialization tasks have been set up.
167    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
168        let mut guard = self.tasks.lock();
169
170        let future = Recovery::update_state_after_backup_state_change(client);
171        let join_handle = spawn(future);
172
173        guard.update_recovery_state_after_backup = Some(join_handle);
174    }
175}
176
177/// Settings for end-to-end encryption features.
178#[derive(Clone, Copy, Debug, Default)]
179pub struct EncryptionSettings {
180    /// Automatically bootstrap cross-signing for a user once they're logged, in
181    /// case it's not already done yet.
182    ///
183    /// This requires to login with a username and password, or that MSC3967 is
184    /// enabled on the server, as of 2023-10-20.
185    pub auto_enable_cross_signing: bool,
186
187    /// Select a strategy to download room keys from the backup, by default room
188    /// keys won't be downloaded from the backup automatically.
189    ///
190    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
191    pub backup_download_strategy: BackupDownloadStrategy,
192
193    /// Automatically create a backup version if no backup exists.
194    pub auto_enable_backups: bool,
195}
196
197/// Settings for end-to-end encryption features.
198#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
199#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
200pub enum BackupDownloadStrategy {
201    /// Automatically download all room keys from the backup when the backup
202    /// recovery key has been received. The backup recovery key can be received
203    /// in two ways:
204    ///
205    /// 1. Received as a `m.secret.send` to-device event, after a successful
206    ///    interactive verification.
207    /// 2. Imported from secret storage (4S) using the
208    ///    [`SecretStore::import_secrets()`] method.
209    ///
210    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
211    OneShot,
212
213    /// Attempt to download a single room key if an event fails to be decrypted.
214    AfterDecryptionFailure,
215
216    /// Don't download any room keys automatically. The user can manually
217    /// download room keys using the [`Backups::download_room_key()`] methods.
218    ///
219    /// This is the default option.
220    #[default]
221    Manual,
222}
223
224/// The verification state of our own device
225///
226/// This enum tells us if our own user identity trusts these devices, in other
227/// words it tells us if the user identity has signed the device.
228#[derive(Clone, Copy, Debug, Eq, PartialEq)]
229pub enum VerificationState {
230    /// The verification state is unknown for now.
231    Unknown,
232    /// The device is considered to be verified, it has been signed by its user
233    /// identity.
234    Verified,
235    /// The device is unverified.
236    Unverified,
237}
238
239/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
240#[derive(Debug)]
241pub struct CrossProcessLockStoreGuardWithGeneration {
242    _guard: CrossProcessLockGuard,
243    generation: u64,
244}
245
246impl CrossProcessLockStoreGuardWithGeneration {
247    /// Return the Crypto Store generation associated with this store lock.
248    pub fn generation(&self) -> u64 {
249        self.generation
250    }
251}
252
253/// A stateful struct remembering the cross-signing keys we need to upload.
254///
255/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
256/// additional authentication, this struct will contain information on the type
257/// of authentication the user needs to complete before the upload might be
258/// continued.
259///
260/// More info can be found in the [spec].
261///
262/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
263#[derive(Debug)]
264pub struct CrossSigningResetHandle {
265    client: Client,
266    upload_request: UploadSigningKeysRequest,
267    signatures_request: UploadSignaturesRequest,
268    auth_type: CrossSigningResetAuthType,
269    is_cancelled: Mutex<bool>,
270}
271
272impl CrossSigningResetHandle {
273    /// Set up a new `CrossSigningResetHandle`.
274    pub fn new(
275        client: Client,
276        upload_request: UploadSigningKeysRequest,
277        signatures_request: UploadSignaturesRequest,
278        auth_type: CrossSigningResetAuthType,
279    ) -> Self {
280        Self {
281            client,
282            upload_request,
283            signatures_request,
284            auth_type,
285            is_cancelled: Mutex::new(false),
286        }
287    }
288
289    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
290    /// is using.
291    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
292        &self.auth_type
293    }
294
295    /// Continue the cross-signing reset by either waiting for the
296    /// authentication to be done on the side of the OAuth 2.0 server or by
297    /// providing additional [`AuthData`] the homeserver requires.
298    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
299        let mut upload_request = self.upload_request.clone();
300        upload_request.auth = auth;
301
302        while let Err(e) = self.client.send(upload_request.clone()).await {
303            if *self.is_cancelled.lock().await {
304                return Ok(());
305            }
306
307            match e.as_uiaa_response() {
308                Some(uiaa_info) => {
309                    if uiaa_info.auth_error.is_some() {
310                        return Err(e.into());
311                    }
312                }
313                None => return Err(e.into()),
314            }
315        }
316
317        self.client.send(self.signatures_request.clone()).await?;
318
319        Ok(())
320    }
321
322    /// Cancel the ongoing identity reset process
323    pub async fn cancel(&self) {
324        *self.is_cancelled.lock().await = true;
325    }
326}
327
328/// information about the additional authentication that is required before the
329/// cross-signing keys can be uploaded.
330#[derive(Debug, Clone)]
331pub enum CrossSigningResetAuthType {
332    /// The homeserver requires user-interactive authentication.
333    Uiaa(UiaaInfo),
334    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
335    /// approve the upload of cross-signing keys.
336    OAuth(OAuthCrossSigningResetInfo),
337}
338
339impl CrossSigningResetAuthType {
340    fn new(error: &HttpError) -> Result<Option<Self>> {
341        if let Some(auth_info) = error.as_uiaa_response() {
342            if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
343                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
344            } else {
345                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
346            }
347        } else {
348            Ok(None)
349        }
350    }
351}
352
353/// OAuth 2.0 specific information about the required authentication for the
354/// upload of cross-signing keys.
355#[derive(Debug, Clone, Deserialize)]
356pub struct OAuthCrossSigningResetInfo {
357    /// The URL where the user can approve the reset of the cross-signing keys.
358    pub approval_url: Url,
359}
360
361impl OAuthCrossSigningResetInfo {
362    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
363        let parameters = serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(
364            auth_info.params.as_ref().map(|value| value.get()).unwrap_or_default(),
365        )?;
366
367        Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
368    }
369}
370
371/// The parsed `parameters` part of a [`ruma::api::client::uiaa::UiaaInfo`]
372/// response
373#[derive(Debug, Deserialize)]
374struct OAuthCrossSigningResetUiaaParameters {
375    /// The URL where the user can approve the reset of the cross-signing keys.
376    #[serde(rename = "org.matrix.cross_signing_reset")]
377    reset: OAuthCrossSigningResetUiaaResetParameter,
378}
379
380/// The `org.matrix.cross_signing_reset` part of the Uiaa response `parameters``
381/// dictionary.
382#[derive(Debug, Deserialize)]
383struct OAuthCrossSigningResetUiaaResetParameter {
384    /// The URL where the user can approve the reset of the cross-signing keys.
385    url: Url,
386}
387
388/// A struct that helps to parse the custom error message Synapse posts if a
389/// duplicate one-time key is uploaded.
390#[derive(Debug)]
391struct DuplicateOneTimeKeyErrorMessage {
392    /// The previously uploaded one-time key.
393    old_key: Curve25519PublicKey,
394    /// The one-time key we're attempting to upload right now.
395    new_key: Curve25519PublicKey,
396}
397
398impl FromStr for DuplicateOneTimeKeyErrorMessage {
399    type Err = serde_json::Error;
400
401    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
402        // First we split the string into two parts, the part containing the old key and
403        // the part containing the new key. The parts are conveniently separated
404        // by a `;` character.
405        let mut split = s.split_terminator(';');
406
407        let old_key = split
408            .next()
409            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
410        let new_key = split
411            .next()
412            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
413
414        // Now we remove the lengthy prefix from the part containing the old key, we
415        // should be left with just the JSON of the signed key.
416        let old_key_index = old_key
417            .find("Old key:")
418            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
419
420        let old_key = old_key[old_key_index..]
421            .trim()
422            .strip_prefix("Old key:")
423            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
424
425        // The part containing the new key is much simpler, we just remove a static
426        // prefix.
427        let new_key = new_key
428            .trim()
429            .strip_prefix("new key:")
430            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
431
432        // The JSON containing the new key is for some reason quoted using single
433        // quotes, so let's replace them with normal double quotes.
434        let new_key = new_key.replace("'", "\"");
435
436        // Let's deserialize now.
437        let old_key: SignedKey = serde_json::from_str(old_key)?;
438        let new_key: SignedKey = serde_json::from_str(&new_key)?;
439
440        // Pick out the Curve keys, we don't care about the rest that much.
441        let old_key = old_key.key();
442        let new_key = new_key.key();
443
444        Ok(Self { old_key, new_key })
445    }
446}
447
448impl Client {
449    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
450        self.base_client().olm_machine().await
451    }
452
453    pub(crate) async fn mark_request_as_sent(
454        &self,
455        request_id: &TransactionId,
456        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
457    ) -> Result<(), matrix_sdk_base::Error> {
458        Ok(self
459            .olm_machine()
460            .await
461            .as_ref()
462            .expect(
463                "We should have an olm machine once we try to mark E2EE related requests as sent",
464            )
465            .mark_request_as_sent(request_id, response)
466            .await?)
467    }
468
469    /// Query the server for users device keys.
470    ///
471    /// # Panics
472    ///
473    /// Panics if no key query needs to be done.
474    #[instrument(skip(self, device_keys))]
475    pub(crate) async fn keys_query(
476        &self,
477        request_id: &TransactionId,
478        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
479    ) -> Result<get_keys::v3::Response> {
480        let request = assign!(get_keys::v3::Request::new(), { device_keys });
481
482        let response = self.send(request).await?;
483        self.mark_request_as_sent(request_id, &response).await?;
484        self.encryption().update_state_after_keys_query(&response).await;
485
486        Ok(response)
487    }
488
489    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
490    /// encrypting and uploading a provided reader.
491    ///
492    /// # Arguments
493    ///
494    /// * `content_type` - The content type of the file.
495    /// * `reader` - The reader that should be encrypted and uploaded.
496    ///
497    /// # Examples
498    ///
499    /// ```no_run
500    /// # use matrix_sdk::Client;
501    /// # use url::Url;
502    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
503    /// use serde::{Deserialize, Serialize};
504    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
505    ///
506    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
507    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
508    /// struct CustomEventContent {
509    ///     encrypted_file: EncryptedFile,
510    /// }
511    ///
512    /// # async {
513    /// # let homeserver = Url::parse("http://example.com")?;
514    /// # let client = Client::new(homeserver).await?;
515    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
516    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
517    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
518    ///
519    /// room.send(CustomEventContent { encrypted_file }).await?;
520    /// # anyhow::Ok(()) };
521    /// ```
522    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
523        &'a self,
524        reader: &'a mut R,
525    ) -> UploadEncryptedFile<'a, R> {
526        UploadEncryptedFile::new(self, reader)
527    }
528
529    /// Encrypt and upload the file and thumbnails, and return the source
530    /// information.
531    pub(crate) async fn upload_encrypted_media_and_thumbnail(
532        &self,
533        data: &[u8],
534        thumbnail: Option<Thumbnail>,
535        send_progress: SharedObservable<TransmissionProgress>,
536    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
537        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
538
539        let upload_attachment = async {
540            let mut cursor = Cursor::new(data);
541            self.upload_encrypted_file(&mut cursor)
542                .with_send_progress_observable(send_progress)
543                .await
544        };
545
546        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
547
548        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
549    }
550
551    /// Uploads an encrypted thumbnail to the media repository, and returns
552    /// its source and extra information.
553    async fn upload_encrypted_thumbnail(
554        &self,
555        thumbnail: Option<Thumbnail>,
556        send_progress: SharedObservable<TransmissionProgress>,
557    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
558        let Some(thumbnail) = thumbnail else {
559            return Ok(None);
560        };
561
562        let (data, _, thumbnail_info) = thumbnail.into_parts();
563        let mut cursor = Cursor::new(data);
564
565        let file = self
566            .upload_encrypted_file(&mut cursor)
567            .with_send_progress_observable(send_progress)
568            .await?;
569
570        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
571    }
572
573    /// Claim one-time keys creating new Olm sessions.
574    ///
575    /// # Arguments
576    ///
577    /// * `users` - The list of user/device pairs that we should claim keys for.
578    pub(crate) async fn claim_one_time_keys(
579        &self,
580        users: impl Iterator<Item = &UserId>,
581    ) -> Result<()> {
582        let _lock = self.locks().key_claim_lock.lock().await;
583
584        if let Some((request_id, request)) = self
585            .olm_machine()
586            .await
587            .as_ref()
588            .ok_or(Error::NoOlmMachine)?
589            .get_missing_sessions(users)
590            .await?
591        {
592            let response = self.send(request).await?;
593            self.mark_request_as_sent(&request_id, &response).await?;
594        }
595
596        Ok(())
597    }
598
599    /// Upload the E2E encryption keys.
600    ///
601    /// This uploads the long lived device keys as well as the required amount
602    /// of one-time keys.
603    ///
604    /// # Panics
605    ///
606    /// Panics if the client isn't logged in, or if no encryption keys need to
607    /// be uploaded.
608    #[instrument(skip(self, request))]
609    pub(crate) async fn keys_upload(
610        &self,
611        request_id: &TransactionId,
612        request: &upload_keys::v3::Request,
613    ) -> Result<upload_keys::v3::Response> {
614        debug!(
615            device_keys = request.device_keys.is_some(),
616            one_time_key_count = request.one_time_keys.len(),
617            "Uploading public encryption keys",
618        );
619
620        let response = self.send(request.clone()).await?;
621        self.mark_request_as_sent(request_id, &response).await?;
622
623        Ok(response)
624    }
625
626    pub(crate) async fn room_send_helper(
627        &self,
628        request: &RoomMessageRequest,
629    ) -> Result<send_message_event::v3::Response> {
630        let content = request.content.clone();
631        let txn_id = request.txn_id.clone();
632        let room_id = &request.room_id;
633
634        self.get_room(room_id)
635            .expect("Can't send a message to a room that isn't known to the store")
636            .send(*content)
637            .with_transaction_id(txn_id)
638            .await
639    }
640
641    pub(crate) async fn send_to_device(
642        &self,
643        request: &ToDeviceRequest,
644    ) -> HttpResult<ToDeviceResponse> {
645        let request = RumaToDeviceRequest::new_raw(
646            request.event_type.clone(),
647            request.txn_id.clone(),
648            request.messages.clone(),
649        );
650
651        self.send(request).await
652    }
653
654    pub(crate) async fn send_verification_request(
655        &self,
656        request: OutgoingVerificationRequest,
657    ) -> Result<()> {
658        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
659
660        match request {
661            ToDevice(t) => {
662                self.send_to_device(&t).await?;
663            }
664            InRoom(r) => {
665                self.room_send_helper(&r).await?;
666            }
667        }
668
669        Ok(())
670    }
671
672    /// Get the existing DM room with the given user, if any.
673    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
674        let rooms = self.joined_rooms();
675
676        // Find the room we share with the `user_id` and only with `user_id`
677        let room = rooms.into_iter().find(|r| {
678            let targets = r.direct_targets();
679            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
680        });
681
682        trace!(?room, "Found room");
683        room
684    }
685
686    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
687        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
688
689        match r.request() {
690            AnyOutgoingRequest::KeysQuery(request) => {
691                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
692            }
693            AnyOutgoingRequest::KeysUpload(request) => {
694                let response = self.keys_upload(r.request_id(), request).await;
695
696                if let Err(e) = &response {
697                    match e.as_ruma_api_error() {
698                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
699                            if let ErrorBody::Standard { message, .. } = &e.body {
700                                // This is one of the nastiest errors we can have. The server
701                                // telling us that we already have a one-time key uploaded means
702                                // that we forgot about some of our one-time keys. This will lead to
703                                // UTDs.
704                                {
705                                    let already_reported = self
706                                        .state_store()
707                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
708                                        .await?
709                                        .is_some();
710
711                                    if message.starts_with("One time key") && !already_reported {
712                                        if let Ok(message) =
713                                            DuplicateOneTimeKeyErrorMessage::from_str(message)
714                                        {
715                                            error!(
716                                                sentry = true,
717                                                old_key = %message.old_key,
718                                                new_key = %message.new_key,
719                                                "Duplicate one-time keys have been uploaded"
720                                            );
721                                        } else {
722                                            error!(
723                                                sentry = true,
724                                                "Duplicate one-time keys have been uploaded"
725                                            );
726                                        }
727
728                                        self.state_store()
729                                            .set_kv_data(
730                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
731                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
732                                            )
733                                            .await?;
734                                    }
735                                }
736                            }
737                        }
738                        _ => {}
739                    }
740
741                    response?;
742                }
743            }
744            AnyOutgoingRequest::ToDeviceRequest(request) => {
745                let response = self.send_to_device(request).await?;
746                self.mark_request_as_sent(r.request_id(), &response).await?;
747            }
748            AnyOutgoingRequest::SignatureUpload(request) => {
749                let response = self.send(request.clone()).await?;
750                self.mark_request_as_sent(r.request_id(), &response).await?;
751            }
752            AnyOutgoingRequest::RoomMessage(request) => {
753                let response = self.room_send_helper(request).await?;
754                self.mark_request_as_sent(r.request_id(), &response).await?;
755            }
756            AnyOutgoingRequest::KeysClaim(request) => {
757                let response = self.send(request.clone()).await?;
758                self.mark_request_as_sent(r.request_id(), &response).await?;
759            }
760        }
761
762        Ok(())
763    }
764
765    #[instrument(skip_all)]
766    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
767        const MAX_CONCURRENT_REQUESTS: usize = 20;
768
769        // This is needed because sometimes we need to automatically
770        // claim some one-time keys to unwedge an existing Olm session.
771        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
772            warn!("Error while claiming one-time keys {:?}", e);
773        }
774
775        let outgoing_requests = stream::iter(
776            self.olm_machine()
777                .await
778                .as_ref()
779                .ok_or(Error::NoOlmMachine)?
780                .outgoing_requests()
781                .await?,
782        )
783        .map(|r| self.send_outgoing_request(r));
784
785        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
786
787        requests
788            .for_each(|r| async move {
789                match r {
790                    Ok(_) => (),
791                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
792                }
793            })
794            .await;
795
796        Ok(())
797    }
798}
799
800#[cfg(any(feature = "testing", test))]
801impl Client {
802    /// Get the olm machine, for testing purposes only.
803    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
804        self.olm_machine().await
805    }
806}
807
808/// A high-level API to manage the client's encryption.
809///
810/// To get this, use [`Client::encryption()`].
811#[derive(Debug, Clone)]
812pub struct Encryption {
813    /// The underlying client.
814    client: Client,
815}
816
817impl Encryption {
818    pub(crate) fn new(client: Client) -> Self {
819        Self { client }
820    }
821
822    /// Returns the current encryption settings for this client.
823    pub(crate) fn settings(&self) -> EncryptionSettings {
824        self.client.inner.e2ee.encryption_settings
825    }
826
827    /// Get the public ed25519 key of our own device. This is usually what is
828    /// called the fingerprint of the device.
829    pub async fn ed25519_key(&self) -> Option<String> {
830        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
831    }
832
833    /// Get the public Curve25519 key of our own device.
834    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
835        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
836    }
837
838    /// Get the current device creation timestamp.
839    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
840        match self.get_own_device().await {
841            Ok(Some(device)) => device.first_time_seen_ts(),
842            // Should not happen, there should always be an own device
843            _ => MilliSecondsSinceUnixEpoch::now(),
844        }
845    }
846
847    pub(crate) async fn import_secrets_bundle(
848        &self,
849        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
850    ) -> Result<(), SecretImportError> {
851        let olm_machine = self.client.olm_machine().await;
852        let olm_machine =
853            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
854
855        olm_machine.store().import_secrets_bundle(bundle).await
856    }
857
858    /// Get the status of the private cross signing keys.
859    ///
860    /// This can be used to check which private cross signing keys we have
861    /// stored locally.
862    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
863        let olm = self.client.olm_machine().await;
864        let machine = olm.as_ref()?;
865        Some(machine.cross_signing_status().await)
866    }
867
868    /// Does the user have other devices that the current device can verify
869    /// against?
870    ///
871    /// The device must be signed by the user's cross-signing key, must have an
872    /// identity, and must not be a dehydrated device.
873    pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
874        let olm_machine = self.client.olm_machine().await;
875        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
876        let user_id = olm_machine.user_id();
877
878        self.ensure_initial_key_query().await?;
879
880        let devices = self.get_user_devices(user_id).await?;
881
882        let ret = devices.devices().any(|device| {
883            device.is_cross_signed_by_owner()
884                && device.curve25519_key().is_some()
885                && !device.is_dehydrated()
886        });
887
888        Ok(ret)
889    }
890
891    /// Get all the tracked users we know about
892    ///
893    /// Tracked users are users for which we keep the device list of E2EE
894    /// capable devices up to date.
895    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
896        if let Some(machine) = self.client.olm_machine().await.as_ref() {
897            machine.tracked_users().await
898        } else {
899            Ok(HashSet::new())
900        }
901    }
902
903    /// Get a [`Subscriber`] for the [`VerificationState`].
904    ///
905    /// # Examples
906    ///
907    /// ```no_run
908    /// use matrix_sdk::{Client, encryption};
909    /// use url::Url;
910    ///
911    /// # async {
912    /// let homeserver = Url::parse("http://example.com")?;
913    /// let client = Client::new(homeserver).await?;
914    /// let mut subscriber = client.encryption().verification_state();
915    ///
916    /// let current_value = subscriber.get();
917    ///
918    /// println!("The current verification state is: {current_value:?}");
919    ///
920    /// if let Some(verification_state) = subscriber.next().await {
921    ///     println!("Received verification state update {:?}", verification_state)
922    /// }
923    /// # anyhow::Ok(()) };
924    /// ```
925    pub fn verification_state(&self) -> Subscriber<VerificationState> {
926        self.client.inner.verification_state.subscribe_reset()
927    }
928
929    /// Get a verification object with the given flow id.
930    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
931        let olm = self.client.olm_machine().await;
932        let olm = olm.as_ref()?;
933        #[allow(clippy::bind_instead_of_map)]
934        olm.get_verification(user_id, flow_id).and_then(|v| match v {
935            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
936                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
937            }
938            #[cfg(feature = "qrcode")]
939            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
940                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
941            }
942            _ => None,
943        })
944    }
945
946    /// Get a `VerificationRequest` object for the given user with the given
947    /// flow id.
948    pub async fn get_verification_request(
949        &self,
950        user_id: &UserId,
951        flow_id: impl AsRef<str>,
952    ) -> Option<VerificationRequest> {
953        let olm = self.client.olm_machine().await;
954        let olm = olm.as_ref()?;
955
956        olm.get_verification_request(user_id, flow_id)
957            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
958    }
959
960    /// Get a specific device of a user.
961    ///
962    /// # Arguments
963    ///
964    /// * `user_id` - The unique id of the user that the device belongs to.
965    ///
966    /// * `device_id` - The unique id of the device.
967    ///
968    /// Returns a `Device` if one is found and the crypto store didn't throw an
969    /// error.
970    ///
971    /// This will always return None if the client hasn't been logged in.
972    ///
973    /// # Examples
974    ///
975    /// ```no_run
976    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
977    /// # use url::Url;
978    /// # async {
979    /// # let alice = user_id!("@alice:example.org");
980    /// # let homeserver = Url::parse("http://example.com")?;
981    /// # let client = Client::new(homeserver).await?;
982    /// if let Some(device) =
983    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
984    /// {
985    ///     println!("{:?}", device.is_verified());
986    ///
987    ///     if !device.is_verified() {
988    ///         let verification = device.request_verification().await?;
989    ///     }
990    /// }
991    /// # anyhow::Ok(()) };
992    /// ```
993    pub async fn get_device(
994        &self,
995        user_id: &UserId,
996        device_id: &DeviceId,
997    ) -> Result<Option<Device>, CryptoStoreError> {
998        let olm = self.client.olm_machine().await;
999        let Some(machine) = olm.as_ref() else { return Ok(None) };
1000        let device = machine.get_device(user_id, device_id, None).await?;
1001        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1002    }
1003
1004    /// A convenience method to retrieve your own device from the store.
1005    ///
1006    /// This is the same as calling [`Encryption::get_device()`] with your own
1007    /// user and device ID.
1008    ///
1009    /// This will always return a device, unless you are not logged in.
1010    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1011        let olm = self.client.olm_machine().await;
1012        let Some(machine) = olm.as_ref() else { return Ok(None) };
1013        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1014        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1015    }
1016
1017    /// Get a map holding all the devices of an user.
1018    ///
1019    /// This will always return an empty map if the client hasn't been logged
1020    /// in.
1021    ///
1022    /// # Arguments
1023    ///
1024    /// * `user_id` - The unique id of the user that the devices belong to.
1025    ///
1026    /// # Examples
1027    ///
1028    /// ```no_run
1029    /// # use matrix_sdk::{Client, ruma::user_id};
1030    /// # use url::Url;
1031    /// # async {
1032    /// # let alice = user_id!("@alice:example.org");
1033    /// # let homeserver = Url::parse("http://example.com")?;
1034    /// # let client = Client::new(homeserver).await?;
1035    /// let devices = client.encryption().get_user_devices(alice).await?;
1036    ///
1037    /// for device in devices.devices() {
1038    ///     println!("{device:?}");
1039    /// }
1040    /// # anyhow::Ok(()) };
1041    /// ```
1042    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1043        let devices = self
1044            .client
1045            .olm_machine()
1046            .await
1047            .as_ref()
1048            .ok_or(Error::NoOlmMachine)?
1049            .get_user_devices(user_id, None)
1050            .await?;
1051
1052        Ok(UserDevices { inner: devices, client: self.client.clone() })
1053    }
1054
1055    /// Get the E2EE identity of a user from the crypto store.
1056    ///
1057    /// Usually, we only have the E2EE identity of a user locally if the user
1058    /// is tracked, meaning that we are both members of the same encrypted room.
1059    ///
1060    /// To get the E2EE identity of a user even if it is not available locally
1061    /// use [`Encryption::request_user_identity()`].
1062    ///
1063    /// # Arguments
1064    ///
1065    /// * `user_id` - The unique id of the user that the identity belongs to.
1066    ///
1067    /// Returns a `UserIdentity` if one is found and the crypto store
1068    /// didn't throw an error.
1069    ///
1070    /// This will always return None if the client hasn't been logged in.
1071    ///
1072    /// # Examples
1073    ///
1074    /// ```no_run
1075    /// # use matrix_sdk::{Client, ruma::user_id};
1076    /// # use url::Url;
1077    /// # async {
1078    /// # let alice = user_id!("@alice:example.org");
1079    /// # let homeserver = Url::parse("http://example.com")?;
1080    /// # let client = Client::new(homeserver).await?;
1081    /// let user = client.encryption().get_user_identity(alice).await?;
1082    ///
1083    /// if let Some(user) = user {
1084    ///     println!("{:?}", user.is_verified());
1085    ///
1086    ///     let verification = user.request_verification().await?;
1087    /// }
1088    /// # anyhow::Ok(()) };
1089    /// ```
1090    pub async fn get_user_identity(
1091        &self,
1092        user_id: &UserId,
1093    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1094        let olm = self.client.olm_machine().await;
1095        let Some(olm) = olm.as_ref() else { return Ok(None) };
1096        let identity = olm.get_identity(user_id, None).await?;
1097
1098        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1099    }
1100
1101    /// Get the E2EE identity of a user from the homeserver.
1102    ///
1103    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1104    /// E2EE identity is not found, it should mean that the user did not set
1105    /// up cross-signing.
1106    ///
1107    /// If you want the E2EE identity of a user without making a request to the
1108    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1109    ///
1110    /// # Arguments
1111    ///
1112    /// * `user_id` - The ID of the user that the identity belongs to.
1113    ///
1114    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1115    /// was an issue with the crypto store or with the request to the
1116    /// homeserver.
1117    ///
1118    /// This will always return `None` if the client hasn't been logged in.
1119    ///
1120    /// # Examples
1121    ///
1122    /// ```no_run
1123    /// # use matrix_sdk::{Client, ruma::user_id};
1124    /// # use url::Url;
1125    /// # async {
1126    /// # let alice = user_id!("@alice:example.org");
1127    /// # let homeserver = Url::parse("http://example.com")?;
1128    /// # let client = Client::new(homeserver).await?;
1129    /// let user = client.encryption().request_user_identity(alice).await?;
1130    ///
1131    /// if let Some(user) = user {
1132    ///     println!("User is verified: {:?}", user.is_verified());
1133    ///
1134    ///     let verification = user.request_verification().await?;
1135    /// }
1136    /// # anyhow::Ok(()) };
1137    /// ```
1138    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1139        let olm = self.client.olm_machine().await;
1140        let Some(olm) = olm.as_ref() else { return Ok(None) };
1141
1142        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1143        self.client.keys_query(&request_id, request.device_keys).await?;
1144
1145        let identity = olm.get_identity(user_id, None).await?;
1146        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1147    }
1148
1149    /// Returns a stream of device updates, allowing users to listen for
1150    /// notifications about new or changed devices.
1151    ///
1152    /// The stream produced by this method emits updates whenever a new device
1153    /// is discovered or when an existing device's information is changed. Users
1154    /// can subscribe to this stream and receive updates in real-time.
1155    ///
1156    /// # Examples
1157    ///
1158    /// ```no_run
1159    /// # use matrix_sdk::Client;
1160    /// # use ruma::{device_id, user_id};
1161    /// # use futures_util::{pin_mut, StreamExt};
1162    /// # let client: Client = unimplemented!();
1163    /// # async {
1164    /// let devices_stream = client.encryption().devices_stream().await?;
1165    /// let user_id = client
1166    ///     .user_id()
1167    ///     .expect("We should know our user id after we have logged in");
1168    /// pin_mut!(devices_stream);
1169    ///
1170    /// for device_updates in devices_stream.next().await {
1171    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1172    ///         for device in user_devices.values() {
1173    ///             println!("A new device has been added {}", device.device_id());
1174    ///         }
1175    ///     }
1176    /// }
1177    /// # anyhow::Ok(()) };
1178    /// ```
1179    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1180        let olm = self.client.olm_machine().await;
1181        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1182        let client = self.client.to_owned();
1183
1184        Ok(olm
1185            .store()
1186            .devices_stream()
1187            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1188    }
1189
1190    /// Returns a stream of user identity updates, allowing users to listen for
1191    /// notifications about new or changed user identities.
1192    ///
1193    /// The stream produced by this method emits updates whenever a new user
1194    /// identity is discovered or when an existing identities information is
1195    /// changed. Users can subscribe to this stream and receive updates in
1196    /// real-time.
1197    ///
1198    /// # Examples
1199    ///
1200    /// ```no_run
1201    /// # use matrix_sdk::Client;
1202    /// # use ruma::{device_id, user_id};
1203    /// # use futures_util::{pin_mut, StreamExt};
1204    /// # let client: Client = unimplemented!();
1205    /// # async {
1206    /// let identities_stream =
1207    ///     client.encryption().user_identities_stream().await?;
1208    /// pin_mut!(identities_stream);
1209    ///
1210    /// for identity_updates in identities_stream.next().await {
1211    ///     for (_, identity) in identity_updates.new {
1212    ///         println!("A new identity has been added {}", identity.user_id());
1213    ///     }
1214    /// }
1215    /// # anyhow::Ok(()) };
1216    /// ```
1217    pub async fn user_identities_stream(
1218        &self,
1219    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1220        let olm = self.client.olm_machine().await;
1221        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1222        let client = self.client.to_owned();
1223
1224        Ok(olm
1225            .store()
1226            .user_identities_stream()
1227            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1228    }
1229
1230    /// Create and upload a new cross signing identity.
1231    ///
1232    /// # Arguments
1233    ///
1234    /// * `auth_data` - This request requires user interactive auth, the first
1235    ///   request needs to set this to `None` and will always fail with an
1236    ///   `UiaaResponse`. The response will contain information for the
1237    ///   interactive auth and the same request needs to be made but this time
1238    ///   with some `auth_data` provided.
1239    ///
1240    /// # Examples
1241    ///
1242    /// ```no_run
1243    /// # use std::collections::BTreeMap;
1244    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1245    /// # use url::Url;
1246    /// # use serde_json::json;
1247    /// # async {
1248    /// # let homeserver = Url::parse("http://example.com")?;
1249    /// # let client = Client::new(homeserver).await?;
1250    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1251    ///     if let Some(response) = e.as_uiaa_response() {
1252    ///         let mut password = uiaa::Password::new(
1253    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1254    ///             "wordpass".to_owned(),
1255    ///         );
1256    ///         password.session = response.session.clone();
1257    ///
1258    ///         client
1259    ///             .encryption()
1260    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1261    ///             .await
1262    ///             .expect("Couldn't bootstrap cross signing")
1263    ///     } else {
1264    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1265    ///     }
1266    /// }
1267    /// # anyhow::Ok(()) };
1268    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1269        let olm = self.client.olm_machine().await;
1270        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1271
1272        let CrossSigningBootstrapRequests {
1273            upload_signing_keys_req,
1274            upload_keys_req,
1275            upload_signatures_req,
1276        } = olm.bootstrap_cross_signing(false).await?;
1277
1278        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1279            auth: auth_data,
1280            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1281            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1282            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1283        });
1284
1285        if let Some(req) = upload_keys_req {
1286            self.client.send_outgoing_request(req).await?;
1287        }
1288        self.client.send(upload_signing_keys_req).await?;
1289        self.client.send(upload_signatures_req).await?;
1290
1291        Ok(())
1292    }
1293
1294    /// Reset the cross-signing keys.
1295    ///
1296    /// # Example
1297    ///
1298    /// ```no_run
1299    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1300    /// # use url::Url;
1301    /// # async {
1302    /// # let homeserver = Url::parse("http://example.com")?;
1303    /// # let client = Client::new(homeserver).await?;
1304    /// # let user_id = unimplemented!();
1305    /// let encryption = client.encryption();
1306    ///
1307    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1308    ///     match handle.auth_type() {
1309    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1310    ///             use matrix_sdk::ruma::api::client::uiaa;
1311    ///
1312    ///             let password = "1234".to_owned();
1313    ///             let mut password = uiaa::Password::new(user_id, password);
1314    ///             password.session = uiaa.session;
1315    ///
1316    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1317    ///         }
1318    ///         CrossSigningResetAuthType::OAuth(o) => {
1319    ///             println!(
1320    ///                 "To reset your end-to-end encryption cross-signing identity, \
1321    ///                 you first need to approve it at {}",
1322    ///                 o.approval_url
1323    ///             );
1324    ///             handle.auth(None).await?;
1325    ///         }
1326    ///     }
1327    /// }
1328    /// # anyhow::Ok(()) };
1329    /// ```
1330    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1331        let olm = self.client.olm_machine().await;
1332        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1333
1334        let CrossSigningBootstrapRequests {
1335            upload_keys_req,
1336            upload_signing_keys_req,
1337            upload_signatures_req,
1338        } = olm.bootstrap_cross_signing(true).await?;
1339
1340        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1341            auth: None,
1342            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1343            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1344            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1345        });
1346
1347        if let Some(req) = upload_keys_req {
1348            self.client.send_outgoing_request(req).await?;
1349        }
1350
1351        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1352            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1353                let client = self.client.clone();
1354
1355                Ok(Some(CrossSigningResetHandle::new(
1356                    client,
1357                    upload_signing_keys_req,
1358                    upload_signatures_req,
1359                    auth_type,
1360                )))
1361            } else {
1362                Err(error.into())
1363            }
1364        } else {
1365            self.client.send(upload_signatures_req).await?;
1366
1367            Ok(None)
1368        }
1369    }
1370
1371    /// Query the user's own device keys, if, and only if, we didn't have their
1372    /// identity in the first place.
1373    async fn ensure_initial_key_query(&self) -> Result<()> {
1374        let olm_machine = self.client.olm_machine().await;
1375        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1376
1377        let user_id = olm_machine.user_id();
1378
1379        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1380            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1381            self.client.keys_query(&request_id, request.device_keys).await?;
1382        }
1383
1384        Ok(())
1385    }
1386
1387    /// Create and upload a new cross signing identity, if that has not been
1388    /// done yet.
1389    ///
1390    /// This will only create a new cross-signing identity if the user had never
1391    /// done it before. If the user did it before, then this is a no-op.
1392    ///
1393    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1394    /// behavior of this function.
1395    ///
1396    /// # Arguments
1397    ///
1398    /// * `auth_data` - This request requires user interactive auth, the first
1399    ///   request needs to set this to `None` and will always fail with an
1400    ///   `UiaaResponse`. The response will contain information for the
1401    ///   interactive auth and the same request needs to be made but this time
1402    ///   with some `auth_data` provided.
1403    ///
1404    /// # Examples
1405    /// ```no_run
1406    /// # use std::collections::BTreeMap;
1407    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1408    /// # use url::Url;
1409    /// # use serde_json::json;
1410    /// # async {
1411    /// # let homeserver = Url::parse("http://example.com")?;
1412    /// # let client = Client::new(homeserver).await?;
1413    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1414    ///     if let Some(response) = e.as_uiaa_response() {
1415    ///         let mut password = uiaa::Password::new(
1416    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1417    ///             "wordpass".to_owned(),
1418    ///         );
1419    ///         password.session = response.session.clone();
1420    ///
1421    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1422    ///         // avoid checks.
1423    ///         client
1424    ///             .encryption()
1425    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1426    ///             .await
1427    ///             .expect("Couldn't bootstrap cross signing")
1428    ///     } else {
1429    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1430    ///     }
1431    /// }
1432    /// # anyhow::Ok(()) };
1433    pub async fn bootstrap_cross_signing_if_needed(
1434        &self,
1435        auth_data: Option<AuthData>,
1436    ) -> Result<()> {
1437        let olm_machine = self.client.olm_machine().await;
1438        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1439        let user_id = olm_machine.user_id();
1440
1441        self.ensure_initial_key_query().await?;
1442
1443        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1444            self.bootstrap_cross_signing(auth_data).await?;
1445        }
1446
1447        Ok(())
1448    }
1449
1450    /// Export E2EE keys that match the given predicate encrypting them with the
1451    /// given passphrase.
1452    ///
1453    /// # Arguments
1454    ///
1455    /// * `path` - The file path where the exported key file will be saved.
1456    ///
1457    /// * `passphrase` - The passphrase that will be used to encrypt the
1458    ///   exported room keys.
1459    ///
1460    /// * `predicate` - A closure that will be called for every known
1461    ///   `InboundGroupSession`, which represents a room key. If the closure
1462    ///   returns `true` the `InboundGroupSessoin` will be included in the
1463    ///   export, if the closure returns `false` it will not be included.
1464    ///
1465    /// # Panics
1466    ///
1467    /// This method will panic if it isn't run on a Tokio runtime.
1468    ///
1469    /// This method will panic if it can't get enough randomness from the OS to
1470    /// encrypt the exported keys securely.
1471    ///
1472    /// # Examples
1473    ///
1474    /// ```no_run
1475    /// # use std::{path::PathBuf, time::Duration};
1476    /// # use matrix_sdk::{
1477    /// #     Client, config::SyncSettings,
1478    /// #     ruma::room_id,
1479    /// # };
1480    /// # use url::Url;
1481    /// # async {
1482    /// # let homeserver = Url::parse("http://localhost:8080")?;
1483    /// # let mut client = Client::new(homeserver).await?;
1484    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1485    /// // Export all room keys.
1486    /// client
1487    ///     .encryption()
1488    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1489    ///     .await?;
1490    ///
1491    /// // Export only the room keys for a certain room.
1492    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1493    /// let room_id = room_id!("!test:localhost");
1494    ///
1495    /// client
1496    ///     .encryption()
1497    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1498    ///     .await?;
1499    /// # anyhow::Ok(()) };
1500    /// ```
1501    #[cfg(not(target_family = "wasm"))]
1502    pub async fn export_room_keys(
1503        &self,
1504        path: PathBuf,
1505        passphrase: &str,
1506        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1507    ) -> Result<()> {
1508        let olm = self.client.olm_machine().await;
1509        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1510
1511        let keys = olm.store().export_room_keys(predicate).await?;
1512        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1513
1514        let encrypt = move || -> Result<()> {
1515            let export: String =
1516                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1517            let mut file = std::fs::File::create(path)?;
1518            file.write_all(&export.into_bytes())?;
1519            Ok(())
1520        };
1521
1522        let task = tokio::task::spawn_blocking(encrypt);
1523        task.await.expect("Task join error")
1524    }
1525
1526    /// Import E2EE keys from the given file path.
1527    ///
1528    /// # Arguments
1529    ///
1530    /// * `path` - The file path where the exported key file will can be found.
1531    ///
1532    /// * `passphrase` - The passphrase that should be used to decrypt the
1533    ///   exported room keys.
1534    ///
1535    /// Returns a tuple of numbers that represent the number of sessions that
1536    /// were imported and the total number of sessions that were found in the
1537    /// key export.
1538    ///
1539    /// # Panics
1540    ///
1541    /// This method will panic if it isn't run on a Tokio runtime.
1542    ///
1543    /// ```no_run
1544    /// # use std::{path::PathBuf, time::Duration};
1545    /// # use matrix_sdk::{
1546    /// #     Client, config::SyncSettings,
1547    /// #     ruma::room_id,
1548    /// # };
1549    /// # use url::Url;
1550    /// # async {
1551    /// # let homeserver = Url::parse("http://localhost:8080")?;
1552    /// # let mut client = Client::new(homeserver).await?;
1553    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1554    /// let result =
1555    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1556    ///
1557    /// println!(
1558    ///     "Imported {} room keys out of {}",
1559    ///     result.imported_count, result.total_count
1560    /// );
1561    /// # anyhow::Ok(()) };
1562    /// ```
1563    #[cfg(not(target_family = "wasm"))]
1564    pub async fn import_room_keys(
1565        &self,
1566        path: PathBuf,
1567        passphrase: &str,
1568    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1569        let olm = self.client.olm_machine().await;
1570        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1571        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1572
1573        let decrypt = move || {
1574            let file = std::fs::File::open(path)?;
1575            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1576        };
1577
1578        let task = tokio::task::spawn_blocking(decrypt);
1579        let import = task.await.expect("Task join error")?;
1580
1581        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1582
1583        self.backups().maybe_trigger_backup();
1584
1585        Ok(ret)
1586    }
1587
1588    /// Receive notifications of room keys being received as a [`Stream`].
1589    ///
1590    /// Each time a room key is updated in any way, an update will be sent to
1591    /// the stream. Updates that happen at the same time are batched into a
1592    /// [`Vec`].
1593    ///
1594    /// If the reader of the stream lags too far behind, an error is broadcast
1595    /// containing the number of skipped items.
1596    ///
1597    /// # Examples
1598    ///
1599    /// ```no_run
1600    /// # use matrix_sdk::Client;
1601    /// # use url::Url;
1602    /// # async {
1603    /// # let homeserver = Url::parse("http://example.com")?;
1604    /// # let client = Client::new(homeserver).await?;
1605    /// use futures_util::StreamExt;
1606    ///
1607    /// let Some(mut room_keys_stream) =
1608    ///     client.encryption().room_keys_received_stream().await
1609    /// else {
1610    ///     return Ok(());
1611    /// };
1612    ///
1613    /// while let Some(update) = room_keys_stream.next().await {
1614    ///     println!("Received room keys {update:?}");
1615    /// }
1616    /// # anyhow::Ok(()) };
1617    /// ```
1618    pub async fn room_keys_received_stream(
1619        &self,
1620    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1621    {
1622        let olm = self.client.olm_machine().await;
1623        let olm = olm.as_ref()?;
1624
1625        Some(olm.store().room_keys_received_stream())
1626    }
1627
1628    /// Receive notifications of historic room key bundles as a [`Stream`].
1629    ///
1630    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1631    ///
1632    /// Each time a historic room key bundle was received, an update will be
1633    /// sent to the stream. This stream is useful for informative purposes
1634    /// exclusively, historic room key bundles are handled by the SDK
1635    /// automatically.
1636    ///
1637    /// # Examples
1638    ///
1639    /// ```no_run
1640    /// # use matrix_sdk::Client;
1641    /// # use url::Url;
1642    /// # async {
1643    /// # let homeserver = Url::parse("http://example.com")?;
1644    /// # let client = Client::new(homeserver).await?;
1645    /// use futures_util::StreamExt;
1646    ///
1647    /// let Some(mut bundle_stream) =
1648    ///     client.encryption().historic_room_key_stream().await
1649    /// else {
1650    ///     return Ok(());
1651    /// };
1652    ///
1653    /// while let Some(bundle_info) = bundle_stream.next().await {
1654    ///     println!("Received a historic room key bundle {bundle_info:?}");
1655    /// }
1656    /// # anyhow::Ok(()) };
1657    /// ```
1658    pub async fn historic_room_key_stream(
1659        &self,
1660    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1661        let olm = self.client.olm_machine().await;
1662        let olm = olm.as_ref()?;
1663
1664        Some(olm.store().historic_room_key_stream())
1665    }
1666
1667    /// Get the secret storage manager of the client.
1668    pub fn secret_storage(&self) -> SecretStorage {
1669        SecretStorage { client: self.client.to_owned() }
1670    }
1671
1672    /// Get the backups manager of the client.
1673    pub fn backups(&self) -> Backups {
1674        Backups { client: self.client.to_owned() }
1675    }
1676
1677    /// Get the recovery manager of the client.
1678    pub fn recovery(&self) -> Recovery {
1679        Recovery { client: self.client.to_owned() }
1680    }
1681
1682    /// Enables the crypto-store cross-process lock.
1683    ///
1684    /// This may be required if there are multiple processes that may do writes
1685    /// to the same crypto store. In that case, it's necessary to create a
1686    /// lock, so that only one process writes to it, otherwise this may
1687    /// cause confusing issues because of stale data contained in in-memory
1688    /// caches.
1689    ///
1690    /// The provided `lock_value` must be a unique identifier for this process.
1691    /// Check [`Client::cross_process_store_locks_holder_name`] to
1692    /// get the global value.
1693    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1694        // If the lock has already been created, don't recreate it from scratch.
1695        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1696            let prev_holder = prev_lock.lock_holder();
1697            if prev_holder == lock_value {
1698                return Ok(());
1699            }
1700            warn!(
1701                "Recreating cross-process store lock with a different holder value: \
1702                 prev was {prev_holder}, new is {lock_value}"
1703            );
1704        }
1705
1706        let olm_machine = self.client.base_client().olm_machine().await;
1707        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1708
1709        let lock =
1710            olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1711
1712        // Gently try to initialize the crypto store generation counter.
1713        //
1714        // If we don't get the lock immediately, then it is already acquired by another
1715        // process, and we'll get to reload next time we acquire the lock.
1716        {
1717            let guard = lock.try_lock_once().await?;
1718            if guard.is_some() {
1719                olm_machine
1720                    .initialize_crypto_store_generation(
1721                        &self.client.locks().crypto_store_generation,
1722                    )
1723                    .await?;
1724            }
1725        }
1726
1727        self.client
1728            .locks()
1729            .cross_process_crypto_store_lock
1730            .set(lock)
1731            .map_err(|_| Error::BadCryptoStoreState)?;
1732
1733        Ok(())
1734    }
1735
1736    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1737    /// time.
1738    ///
1739    /// Returns the current generation number.
1740    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1741        let olm_machine_guard = self.client.olm_machine().await;
1742        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1743            let (new_gen, generation_number) = olm_machine
1744                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1745                .await?;
1746            // If the crypto store generation has changed,
1747            if new_gen {
1748                // (get rid of the reference to the current crypto store first)
1749                drop(olm_machine_guard);
1750                // Recreate the OlmMachine.
1751                self.client.base_client().regenerate_olm(None).await?;
1752            }
1753            Ok(generation_number)
1754        } else {
1755            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1756            // been initialised by the time we get here. Ideally we'd panic, or return an
1757            // error, but for now I'm just adding some logging to check if it
1758            // happens, and returning the magic number 0.
1759            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1760            Ok(0)
1761        }
1762    }
1763
1764    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1765    /// spin-waits until the lock is available.
1766    ///
1767    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1768    /// first time.
1769    pub async fn spin_lock_store(
1770        &self,
1771        max_backoff: Option<u32>,
1772    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1773        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1774            let guard = lock.spin_lock(max_backoff).await?;
1775
1776            let generation = self.on_lock_newly_acquired().await?;
1777
1778            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1779        } else {
1780            Ok(None)
1781        }
1782    }
1783
1784    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1785    /// attempts to lock it once.
1786    ///
1787    /// Returns a guard to the lock, if it was obtained.
1788    pub async fn try_lock_store_once(
1789        &self,
1790    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1791        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1792            let maybe_guard = lock.try_lock_once().await?;
1793
1794            let Some(guard) = maybe_guard else {
1795                return Ok(None);
1796            };
1797
1798            let generation = self.on_lock_newly_acquired().await?;
1799
1800            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1801        } else {
1802            Ok(None)
1803        }
1804    }
1805
1806    /// Testing purposes only.
1807    #[cfg(any(test, feature = "testing"))]
1808    pub async fn uploaded_key_count(&self) -> Result<u64> {
1809        let olm_machine = self.client.olm_machine().await;
1810        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1811        Ok(olm_machine.uploaded_key_count().await?)
1812    }
1813
1814    /// Bootstrap encryption and enables event listeners for the E2EE support.
1815    ///
1816    /// Based on the `EncryptionSettings`, this call might:
1817    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1818    /// - Create a key backup if needed (POST `/room_keys/version`)
1819    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1820    ///
1821    /// As part of this process, and if needed, the current device keys would be
1822    /// uploaded to the server, new account data would be added, and cross
1823    /// signing keys and signatures might be uploaded.
1824    ///
1825    /// Should be called once we
1826    /// created a [`OlmMachine`], i.e. after logging in.
1827    ///
1828    /// # Arguments
1829    ///
1830    /// * `auth_data` - Some requests may require re-authentication. To prevent
1831    ///   the user from having to re-enter their password (or use other
1832    ///   methods), we can provide the authentication data here. This is
1833    ///   necessary for uploading cross-signing keys. However, please note that
1834    ///   there is a proposal (MSC3967) to remove this requirement, which would
1835    ///   allow for the initial upload of cross-signing keys without
1836    ///   authentication, rendering this parameter obsolete.
1837    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1838        // It's fine to be async here as we're only getting the lock protecting the
1839        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1840        // in we won't delay the login or restoration of the Client.
1841        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1842            Some(BundleReceiverTask::new(&self.client).await)
1843        } else {
1844            None
1845        };
1846
1847        let mut tasks = self.client.inner.e2ee.tasks.lock();
1848
1849        let this = self.clone();
1850
1851        tasks.setup_e2ee = Some(spawn(async move {
1852            // Update the current state first, so we don't have to wait for the result of
1853            // network requests
1854            this.update_verification_state().await;
1855
1856            if this.settings().auto_enable_cross_signing
1857                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1858            {
1859                error!("Couldn't bootstrap cross signing {e:?}");
1860            }
1861
1862            if let Err(e) = this.backups().setup_and_resume().await {
1863                error!("Couldn't setup and resume backups {e:?}");
1864            }
1865            if let Err(e) = this.recovery().setup().await {
1866                error!("Couldn't setup and resume recovery {e:?}");
1867            }
1868        }));
1869
1870        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1871    }
1872
1873    /// Waits for end-to-end encryption initialization tasks to finish, if any
1874    /// was running in the background.
1875    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1876        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1877
1878        if let Some(task) = task
1879            && let Err(err) = task.await
1880        {
1881            warn!("Error when initializing backups: {err}");
1882        }
1883    }
1884
1885    /// Upload the device keys and initial set of one-time keys to the server.
1886    ///
1887    /// This should only be called when the user logs in for the first time,
1888    /// the method will ensure that other devices see our own device as an
1889    /// end-to-end encryption enabled one.
1890    ///
1891    /// **Warning**: Do not use this method if we're already calling
1892    /// [`Client::send_outgoing_request()`]. This method is intended for
1893    /// explicitly uploading the device keys before starting a sync.
1894    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1895        let olm = self.client.olm_machine().await;
1896        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1897
1898        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1899            self.client.keys_upload(&request_id, &request).await?;
1900
1901            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1902            self.client.keys_query(&request_id, request.device_keys).await?;
1903        }
1904
1905        Ok(())
1906    }
1907
1908    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1909        self.recovery().update_state_after_keys_query(response).await;
1910
1911        // Only update the verification_state if our own devices changed
1912        if let Some(user_id) = self.client.user_id() {
1913            let contains_own_device = response.device_keys.contains_key(user_id);
1914
1915            if contains_own_device {
1916                self.update_verification_state().await;
1917            }
1918        }
1919    }
1920
1921    async fn update_verification_state(&self) {
1922        match self.get_own_device().await {
1923            Ok(device) => {
1924                if let Some(device) = device {
1925                    let is_verified = device.is_cross_signed_by_owner();
1926
1927                    if is_verified {
1928                        self.client.inner.verification_state.set(VerificationState::Verified);
1929                    } else {
1930                        self.client.inner.verification_state.set(VerificationState::Unverified);
1931                    }
1932                } else {
1933                    warn!("Couldn't find out own device in the store.");
1934                    self.client.inner.verification_state.set(VerificationState::Unknown);
1935                }
1936            }
1937            Err(error) => {
1938                warn!("Failed retrieving own device: {error}");
1939                self.client.inner.verification_state.set(VerificationState::Unknown);
1940            }
1941        }
1942    }
1943
1944    /// Encrypts then send the given content via the `/sendToDevice` end-point
1945    /// using Olm encryption.
1946    ///
1947    /// If there are a lot of recipient devices multiple `/sendToDevice`
1948    /// requests might be sent out.
1949    ///
1950    /// # Returns
1951    /// A list of failures. The list of devices that couldn't get the messages.
1952    #[cfg(feature = "experimental-send-custom-to-device")]
1953    pub async fn encrypt_and_send_raw_to_device(
1954        &self,
1955        recipient_devices: Vec<&Device>,
1956        event_type: &str,
1957        content: Raw<AnyToDeviceEventContent>,
1958        share_strategy: CollectStrategy,
1959    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1960        let users = recipient_devices.iter().map(|device| device.user_id());
1961
1962        // Will claim one-time-key for users that needs it
1963        // TODO: For later optimisation: This will establish missing olm sessions with
1964        // all this users devices, but we just want for some devices.
1965        self.client.claim_one_time_keys(users).await?;
1966
1967        let olm = self.client.olm_machine().await;
1968        let olm = olm.as_ref().expect("Olm machine wasn't started");
1969
1970        let (requests, withhelds) = olm
1971            .encrypt_content_for_devices(
1972                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1973                event_type,
1974                &content
1975                    .deserialize_as::<serde_json::Value>()
1976                    .expect("Deserialize as Value will always work"),
1977                share_strategy,
1978            )
1979            .await?;
1980
1981        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1982
1983        // Push the withhelds in the failures
1984        withhelds.iter().for_each(|(d, _)| {
1985            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1986        });
1987
1988        // TODO: parallelize that? it's already grouping 250 devices per chunk.
1989        for request in requests {
1990            let ruma_request = RumaToDeviceRequest::new_raw(
1991                request.event_type.clone(),
1992                request.txn_id.clone(),
1993                request.messages.clone(),
1994            );
1995
1996            let send_result = self
1997                .client
1998                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1999                .await;
2000
2001            // If the sending failed we need to collect the failures to report them
2002            if send_result.is_err() {
2003                // Mark the sending as failed
2004                for (user_id, device_map) in request.messages {
2005                    for device_id in device_map.keys() {
2006                        match device_id {
2007                            DeviceIdOrAllDevices::DeviceId(device_id) => {
2008                                failures.push((user_id.clone(), device_id.to_owned()));
2009                            }
2010                            DeviceIdOrAllDevices::AllDevices => {
2011                                // Cannot happen in this case
2012                            }
2013                        }
2014                    }
2015                }
2016            }
2017        }
2018
2019        Ok(failures)
2020    }
2021}
2022
2023#[cfg(all(test, not(target_family = "wasm")))]
2024mod tests {
2025    use std::{
2026        ops::Not,
2027        str::FromStr,
2028        sync::{
2029            Arc,
2030            atomic::{AtomicBool, Ordering},
2031        },
2032        time::Duration,
2033    };
2034
2035    use matrix_sdk_test::{
2036        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
2037        event_factory::EventFactory, test_json,
2038    };
2039    use ruma::{
2040        event_id,
2041        events::{reaction::ReactionEventContent, relation::Annotation},
2042        user_id,
2043    };
2044    use serde_json::json;
2045    use wiremock::{
2046        Mock, MockServer, Request, ResponseTemplate,
2047        matchers::{header, method, path_regex},
2048    };
2049
2050    use crate::{
2051        Client, assert_next_matches_with_timeout,
2052        config::RequestConfig,
2053        encryption::{
2054            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2055        },
2056        test_utils::{
2057            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2058        },
2059    };
2060
2061    #[async_test]
2062    async fn test_reaction_sending() {
2063        let server = MockServer::start().await;
2064        let client = logged_in_client(Some(server.uri())).await;
2065
2066        let event_id = event_id!("$2:example.org");
2067
2068        Mock::given(method("GET"))
2069            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2070            .and(header("authorization", "Bearer 1234"))
2071            .respond_with(
2072                ResponseTemplate::new(200)
2073                    .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
2074            )
2075            .mount(&server)
2076            .await;
2077
2078        Mock::given(method("PUT"))
2079            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2080            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2081                "event_id": event_id,
2082            })))
2083            .mount(&server)
2084            .await;
2085
2086        let response = SyncResponseBuilder::default()
2087            .add_joined_room(
2088                JoinedRoomBuilder::default()
2089                    .add_state_event(StateTestEvent::Member)
2090                    .add_state_event(StateTestEvent::PowerLevels)
2091                    .add_state_event(StateTestEvent::Encryption),
2092            )
2093            .build_sync_response();
2094
2095        client.base_client().receive_sync_response(response).await.unwrap();
2096
2097        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2098        assert!(
2099            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2100        );
2101
2102        let event_id = event_id!("$1:example.org");
2103        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2104        room.send(reaction).await.expect("Sending the reaction should not fail");
2105
2106        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2107    }
2108
2109    #[async_test]
2110    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
2111        let server = MockServer::start().await;
2112        let client = logged_in_client(Some(server.uri())).await;
2113        // This is the user ID that is inside MemberAdditional.
2114        // Note the confusing username, so we can share
2115        // GlobalAccountDataTestEvent::Direct with the invited test.
2116        let user_id = user_id!("@invited:localhost");
2117
2118        // When we receive a sync response saying "invited" is invited to a DM
2119        let f = EventFactory::new();
2120        let response = SyncResponseBuilder::default()
2121            .add_joined_room(
2122                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
2123            )
2124            .add_global_account_data(
2125                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2126            )
2127            .build_sync_response();
2128        client.base_client().receive_sync_response(response).await.unwrap();
2129
2130        // Then get_dm_room finds this room
2131        let found_room = client.get_dm_room(user_id).expect("DM not found!");
2132        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2133    }
2134
2135    #[async_test]
2136    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
2137        let server = MockServer::start().await;
2138        let client = logged_in_client(Some(server.uri())).await;
2139        // This is the user ID that is inside MemberInvite
2140        let user_id = user_id!("@invited:localhost");
2141
2142        // When we receive a sync response saying "invited" is invited to a DM
2143        let f = EventFactory::new();
2144        let response = SyncResponseBuilder::default()
2145            .add_joined_room(
2146                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
2147            )
2148            .add_global_account_data(
2149                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2150            )
2151            .build_sync_response();
2152        client.base_client().receive_sync_response(response).await.unwrap();
2153
2154        // Then get_dm_room finds this room
2155        let found_room = client.get_dm_room(user_id).expect("DM not found!");
2156        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2157    }
2158
2159    #[async_test]
2160    async fn test_get_dm_room_still_finds_left_room() {
2161        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
2162        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
2163
2164        let server = MockServer::start().await;
2165        let client = logged_in_client(Some(server.uri())).await;
2166        // This is the user ID that is inside MemberAdditional.
2167        // Note the confusing username, so we can share
2168        // GlobalAccountDataTestEvent::Direct with the invited test.
2169        let user_id = user_id!("@invited:localhost");
2170
2171        // When we receive a sync response saying "invited" is invited to a DM
2172        let f = EventFactory::new();
2173        let response = SyncResponseBuilder::default()
2174            .add_joined_room(
2175                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
2176            )
2177            .add_global_account_data(
2178                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2179            )
2180            .build_sync_response();
2181        client.base_client().receive_sync_response(response).await.unwrap();
2182
2183        // Then get_dm_room finds this room
2184        let found_room = client.get_dm_room(user_id).expect("DM not found!");
2185        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2186    }
2187
2188    #[cfg(feature = "sqlite")]
2189    #[async_test]
2190    async fn test_generation_counter_invalidates_olm_machine() {
2191        // Create two clients using the same sqlite database.
2192
2193        use matrix_sdk_base::store::RoomLoadSettings;
2194        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2195        let session = mock_matrix_session();
2196
2197        let client1 = Client::builder()
2198            .homeserver_url("http://localhost:1234")
2199            .request_config(RequestConfig::new().disable_retry())
2200            .sqlite_store(&sqlite_path, None)
2201            .build()
2202            .await
2203            .unwrap();
2204        client1
2205            .matrix_auth()
2206            .restore_session(session.clone(), RoomLoadSettings::default())
2207            .await
2208            .unwrap();
2209
2210        let client2 = Client::builder()
2211            .homeserver_url("http://localhost:1234")
2212            .request_config(RequestConfig::new().disable_retry())
2213            .sqlite_store(sqlite_path, None)
2214            .build()
2215            .await
2216            .unwrap();
2217        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2218
2219        // When the lock isn't enabled, any attempt at locking won't return a guard.
2220        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2221        assert!(guard.is_none());
2222
2223        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2224        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2225
2226        // One client can take the lock.
2227        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2228        assert!(acquired1.is_some());
2229
2230        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2231        let initial_olm_machine =
2232            client1.olm_machine().await.clone().expect("must have an olm machine");
2233
2234        // Also enable backup to check that new machine has the same backup keys.
2235        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2236            .expect("Can't create new recovery key");
2237        let backup_key = decryption_key.megolm_v1_public_key();
2238        backup_key.set_version("1".to_owned());
2239        initial_olm_machine
2240            .backup_machine()
2241            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2242            .await
2243            .expect("Should save");
2244
2245        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2246
2247        assert!(client1.encryption().backups().are_enabled().await);
2248
2249        // The other client can't take the lock too.
2250        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2251        assert!(acquired2.is_none());
2252
2253        // Now have the first client release the lock,
2254        drop(acquired1);
2255        tokio::time::sleep(Duration::from_millis(100)).await;
2256
2257        // And re-take it.
2258        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2259        assert!(acquired1.is_some());
2260
2261        // In that case, the Olm Machine shouldn't change.
2262        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2263        assert!(initial_olm_machine.same_as(&olm_machine));
2264
2265        // Ok, release again.
2266        drop(acquired1);
2267        tokio::time::sleep(Duration::from_millis(100)).await;
2268
2269        // Client2 can acquire the lock.
2270        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2271        assert!(acquired2.is_some());
2272
2273        // And then release it.
2274        drop(acquired2);
2275        tokio::time::sleep(Duration::from_millis(100)).await;
2276
2277        // Client1 can acquire it again,
2278        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2279        assert!(acquired1.is_some());
2280
2281        // But now its olm machine has been invalidated and thus regenerated!
2282        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2283
2284        assert!(!initial_olm_machine.same_as(&olm_machine));
2285
2286        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2287        assert!(backup_key_new.decryption_key.is_some());
2288        assert_eq!(
2289            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2290            backup_key.to_base64()
2291        );
2292        assert!(client1.encryption().backups().are_enabled().await);
2293    }
2294
2295    #[cfg(feature = "sqlite")]
2296    #[async_test]
2297    async fn test_generation_counter_no_spurious_invalidation() {
2298        // Create two clients using the same sqlite database.
2299
2300        use matrix_sdk_base::store::RoomLoadSettings;
2301        let sqlite_path =
2302            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2303        let session = mock_matrix_session();
2304
2305        let client = Client::builder()
2306            .homeserver_url("http://localhost:1234")
2307            .request_config(RequestConfig::new().disable_retry())
2308            .sqlite_store(&sqlite_path, None)
2309            .build()
2310            .await
2311            .unwrap();
2312        client
2313            .matrix_auth()
2314            .restore_session(session.clone(), RoomLoadSettings::default())
2315            .await
2316            .unwrap();
2317
2318        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2319
2320        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2321
2322        // Enabling the lock doesn't update the olm machine.
2323        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2324        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2325
2326        {
2327            // Simulate that another client hold the lock before.
2328            let client2 = Client::builder()
2329                .homeserver_url("http://localhost:1234")
2330                .request_config(RequestConfig::new().disable_retry())
2331                .sqlite_store(sqlite_path, None)
2332                .build()
2333                .await
2334                .unwrap();
2335            client2
2336                .matrix_auth()
2337                .restore_session(session, RoomLoadSettings::default())
2338                .await
2339                .unwrap();
2340
2341            client2
2342                .encryption()
2343                .enable_cross_process_store_lock("client2".to_owned())
2344                .await
2345                .unwrap();
2346
2347            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2348            assert!(guard.is_some());
2349
2350            drop(guard);
2351            tokio::time::sleep(Duration::from_millis(100)).await;
2352        }
2353
2354        {
2355            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2356            assert!(acquired.is_some());
2357        }
2358
2359        // Taking the lock the first time will update the olm machine.
2360        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2361        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2362
2363        {
2364            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2365            assert!(acquired.is_some());
2366        }
2367
2368        // Re-taking the lock doesn't update the olm machine.
2369        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2370        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2371    }
2372
2373    #[async_test]
2374    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2375        // Given a client and a server
2376        let client = no_retry_test_client(None).await;
2377        let server = MockServer::start().await;
2378
2379        // When we subscribe to its verification state
2380        let mut verification_state = client.encryption().verification_state();
2381
2382        // We can get its initial value, and it's Unknown
2383        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2384
2385        // We set up a mocked request to check this endpoint is not called before
2386        // reading the new state
2387        let keys_requested = Arc::new(AtomicBool::new(false));
2388        let inner_bool = keys_requested.clone();
2389
2390        Mock::given(method("GET"))
2391            .and(path_regex(
2392                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2393            ))
2394            .respond_with(move |_req: &Request| {
2395                inner_bool.fetch_or(true, Ordering::SeqCst);
2396                ResponseTemplate::new(200).set_body_json(json!({}))
2397            })
2398            .mount(&server)
2399            .await;
2400
2401        // When the session is initialised and the encryption tasks spawn
2402        set_client_session(&client).await;
2403
2404        // Then we can get an updated value without waiting for any network requests
2405        assert!(keys_requested.load(Ordering::SeqCst).not());
2406        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2407    }
2408
2409    #[test]
2410    fn test_oauth_reset_info_from_uiaa_info() {
2411        let auth_info = json!({
2412            "session": "dummy",
2413            "flows": [
2414                {
2415                    "stages": [
2416                        "org.matrix.cross_signing_reset"
2417                    ]
2418                }
2419            ],
2420            "params": {
2421                "org.matrix.cross_signing_reset": {
2422                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2423                }
2424            },
2425            "msg": "To reset..."
2426        });
2427
2428        let auth_info = serde_json::from_value(auth_info)
2429            .expect("We should be able to deserialize the UiaaInfo");
2430        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2431            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2432    }
2433
2434    #[test]
2435    fn test_duplicate_one_time_key_error_parsing() {
2436        let message = concat!(
2437            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2438            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2439            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2440            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2441            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2442            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2443            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2444        );
2445        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2446            .expect("We should be able to parse the error message");
2447
2448        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2449        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2450
2451        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2452            .expect_err("We shouldn't be able to parse an incomplete error message");
2453    }
2454
2455    // Helper function for the test_devices_to_verify_against_* tests.  Make a
2456    // response to a /keys/query request using the given device keys and a
2457    // pre-defined set of cross-signing keys.
2458    fn devices_to_verify_against_keys_query_response(
2459        devices: Vec<serde_json::Value>,
2460    ) -> serde_json::Value {
2461        let device_keys: serde_json::Map<String, serde_json::Value> = devices
2462            .into_iter()
2463            .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2464            .collect();
2465        json!({
2466            "device_keys": {
2467                "@example:localhost": device_keys,
2468            },
2469            "master_keys": {
2470                "@example:localhost": {
2471                    "keys": {
2472                        "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2473                    },
2474                    "usage": ["master"],
2475                    "user_id": "@example:localhost",
2476                },
2477            },
2478            "self_signing_keys": {
2479                "@example:localhost": {
2480                    "keys": {
2481                        "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2482                    },
2483                    "usage": ["self_signing"],
2484                    "user_id": "@example:localhost",
2485                    "signatures": {
2486                        "@example:localhost": {
2487                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2488                        },
2489                    },
2490                },
2491            },
2492            "user_signing_keys": {
2493                "@example:localhost": {
2494                    "keys": {
2495                        "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2496                    },
2497                    "usage": ["user_signing"],
2498                    "user_id": "@example:localhost",
2499                    "signatures": {
2500                        "@example:localhost": {
2501                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2502                        },
2503                    },
2504                },
2505            }
2506        })
2507    }
2508
2509    // The following three tests test that we can detect whether the user has
2510    // other devices that they can verify against under different conditions.
2511    #[async_test]
2512    /// Test that we detect that can't verify against another device if we have
2513    /// no devices.
2514    async fn test_devices_to_verify_against_no_devices() {
2515        let server = MockServer::start().await;
2516        let client = logged_in_client(Some(server.uri())).await;
2517
2518        Mock::given(method("POST"))
2519            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2520            .respond_with(
2521                ResponseTemplate::new(200)
2522                    .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2523            )
2524            .mount(&server)
2525            .await;
2526
2527        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2528    }
2529
2530    #[async_test]
2531    /// Test that we detect that we can verify against another cross-signed
2532    /// regular device.
2533    async fn test_devices_to_verify_against_cross_signed() {
2534        let server = MockServer::start().await;
2535        let client = logged_in_client(Some(server.uri())).await;
2536
2537        Mock::given(method("POST"))
2538            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2539            .respond_with(ResponseTemplate::new(200).set_body_json(
2540                devices_to_verify_against_keys_query_response(vec![
2541                    json!({
2542                        "algorithms": [
2543                            "m.olm.v1.curve25519-aes-sha2",
2544                            "m.megolm.v1.aes-sha2",
2545                        ],
2546                        "user_id": "@example:localhost",
2547                        "device_id": "SIGNEDDEVICE",
2548                        "keys": {
2549                            "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2550                            "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2551                        },
2552                        "signatures": {
2553                            "@example:localhost": {
2554                                "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2555                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2556                            },
2557                        },
2558                    })
2559                ])
2560            ))
2561            .mount(&server)
2562            .await;
2563
2564        assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2565    }
2566
2567    #[async_test]
2568    /// Test that we detect that we can't verify against a dehydrated or
2569    /// unsigned device.
2570    async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2571        let server = MockServer::start().await;
2572        let client = logged_in_client(Some(server.uri())).await;
2573        let user_id = client.user_id().unwrap();
2574        let olm_machine = client.olm_machine().await;
2575        let olm_machine = olm_machine.as_ref().unwrap();
2576
2577        Mock::given(method("POST"))
2578            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2579            .respond_with(ResponseTemplate::new(200).set_body_json(
2580                devices_to_verify_against_keys_query_response(vec![
2581                    json!({
2582                        "algorithms": [
2583                            "m.olm.v1.curve25519-aes-sha2",
2584                            "m.megolm.v1.aes-sha2",
2585                        ],
2586                        "user_id": "@example:localhost",
2587                        "device_id": "DEHYDRATEDDEVICE",
2588                        "keys": {
2589                            "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2590                            "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2591                        },
2592                        "dehydrated": true,
2593                        "signatures": {
2594                            "@example:localhost": {
2595                                "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2596                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2597                            },
2598                        },
2599                    }),
2600                    json!({
2601                        "algorithms": [
2602                            "m.olm.v1.curve25519-aes-sha2",
2603                            "m.megolm.v1.aes-sha2",
2604                        ],
2605                        "user_id": "@example:localhost",
2606                        "device_id": "UNSIGNEDDEVICE",
2607                        "keys": {
2608                            "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2609                            "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2610                        },
2611                        "signatures": {
2612                            "@example:localhost": {
2613                                "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2614                            },
2615                        },
2616                    }),
2617                ])
2618            ))
2619            .mount(&server)
2620            .await;
2621
2622        let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2623        client.keys_query(&request_id, request.device_keys).await.unwrap();
2624
2625        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2626    }
2627}