matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::PathBuf,
26    str::FromStr,
27    sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33    future::try_join,
34    stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39    StateStoreDataKey, StateStoreDataValue,
40    cross_process_lock::CrossProcessLockError,
41    crypto::{
42        CrossSigningBootstrapRequests, OlmMachine,
43        store::types::{RoomKeyBundleInfo, RoomKeyInfo},
44        types::{
45            SignedKey,
46            requests::{
47                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
48            },
49        },
50    },
51};
52use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
53use ruma::{
54    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
55    api::client::{
56        error::{ErrorBody, StandardErrorBody},
57        keys::{
58            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
59            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
60        },
61        message::send_message_event,
62        to_device::send_event_to_device::v3::{
63            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
64        },
65        uiaa::{AuthData, UiaaInfo},
66    },
67    assign,
68    events::room::{MediaSource, ThumbnailInfo},
69};
70#[cfg(feature = "experimental-send-custom-to-device")]
71use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
72use serde::{Deserialize, de::Error as _};
73use tasks::BundleReceiverTask;
74use tokio::sync::{Mutex, RwLockReadGuard};
75use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
76use tracing::{debug, error, instrument, warn};
77use url::Url;
78use vodozemac::Curve25519PublicKey;
79
80use self::{
81    backups::{Backups, types::BackupClientState},
82    futures::UploadEncryptedFile,
83    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
84    recovery::{Recovery, RecoveryState},
85    secret_storage::SecretStorage,
86    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
87    verification::{SasVerification, Verification, VerificationRequest},
88};
89use crate::{
90    Client, Error, HttpError, Result, RumaApiError, TransmissionProgress,
91    attachment::Thumbnail,
92    client::{ClientInner, WeakClient},
93    cross_process_lock::CrossProcessLockGuard,
94    error::HttpResult,
95};
96
97pub mod backups;
98pub mod futures;
99pub mod identities;
100pub mod recovery;
101pub mod secret_storage;
102pub(crate) mod tasks;
103pub mod verification;
104
105pub use matrix_sdk_base::crypto::{
106    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
107    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
108    SessionCreationError, SignatureError, VERSION,
109    olm::{
110        SessionCreationError as MegolmSessionCreationError,
111        SessionExportError as OlmSessionExportError,
112    },
113    vodozemac,
114};
115
116#[cfg(feature = "experimental-send-custom-to-device")]
117use crate::config::RequestConfig;
118pub use crate::error::RoomKeyImportError;
119
120/// All the data related to the encryption state.
121pub(crate) struct EncryptionData {
122    /// Background tasks related to encryption (key backup, initialization
123    /// tasks, etc.).
124    pub tasks: StdMutex<ClientTasks>,
125
126    /// End-to-end encryption settings.
127    pub encryption_settings: EncryptionSettings,
128
129    /// All state related to key backup.
130    pub backup_state: BackupClientState,
131
132    /// All state related to secret storage recovery.
133    pub recovery_state: SharedObservable<RecoveryState>,
134}
135
136impl EncryptionData {
137    pub fn new(encryption_settings: EncryptionSettings) -> Self {
138        Self {
139            encryption_settings,
140
141            tasks: StdMutex::new(Default::default()),
142            backup_state: Default::default(),
143            recovery_state: Default::default(),
144        }
145    }
146
147    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
148        let weak_client = WeakClient::from_inner(client);
149
150        let mut tasks = self.tasks.lock();
151        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
152
153        if self.encryption_settings.backup_download_strategy
154            == BackupDownloadStrategy::AfterDecryptionFailure
155        {
156            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
157        }
158    }
159
160    /// Initialize the background task which listens for changes in the
161    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
162    ///
163    /// This should happen after the usual tasks have been set up and after the
164    /// E2EE initialization tasks have been set up.
165    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
166        let mut guard = self.tasks.lock();
167
168        let future = Recovery::update_state_after_backup_state_change(client);
169        let join_handle = spawn(future);
170
171        guard.update_recovery_state_after_backup = Some(join_handle);
172    }
173}
174
175/// Settings for end-to-end encryption features.
176#[derive(Clone, Copy, Debug, Default)]
177pub struct EncryptionSettings {
178    /// Automatically bootstrap cross-signing for a user once they're logged, in
179    /// case it's not already done yet.
180    ///
181    /// This requires to login with a username and password, or that MSC3967 is
182    /// enabled on the server, as of 2023-10-20.
183    pub auto_enable_cross_signing: bool,
184
185    /// Select a strategy to download room keys from the backup, by default room
186    /// keys won't be downloaded from the backup automatically.
187    ///
188    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
189    pub backup_download_strategy: BackupDownloadStrategy,
190
191    /// Automatically create a backup version if no backup exists.
192    pub auto_enable_backups: bool,
193}
194
195/// Settings for end-to-end encryption features.
196#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
197#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
198pub enum BackupDownloadStrategy {
199    /// Automatically download all room keys from the backup when the backup
200    /// recovery key has been received. The backup recovery key can be received
201    /// in two ways:
202    ///
203    /// 1. Received as a `m.secret.send` to-device event, after a successful
204    ///    interactive verification.
205    /// 2. Imported from secret storage (4S) using the
206    ///    [`SecretStore::import_secrets()`] method.
207    ///
208    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
209    OneShot,
210
211    /// Attempt to download a single room key if an event fails to be decrypted.
212    AfterDecryptionFailure,
213
214    /// Don't download any room keys automatically. The user can manually
215    /// download room keys using the [`Backups::download_room_key()`] methods.
216    ///
217    /// This is the default option.
218    #[default]
219    Manual,
220}
221
222/// The verification state of our own device
223///
224/// This enum tells us if our own user identity trusts these devices, in other
225/// words it tells us if the user identity has signed the device.
226#[derive(Clone, Copy, Debug, Eq, PartialEq)]
227pub enum VerificationState {
228    /// The verification state is unknown for now.
229    Unknown,
230    /// The device is considered to be verified, it has been signed by its user
231    /// identity.
232    Verified,
233    /// The device is unverified.
234    Unverified,
235}
236
237/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
238#[derive(Debug)]
239pub struct CrossProcessLockStoreGuardWithGeneration {
240    _guard: CrossProcessLockGuard,
241    generation: u64,
242}
243
244impl CrossProcessLockStoreGuardWithGeneration {
245    /// Return the Crypto Store generation associated with this store lock.
246    pub fn generation(&self) -> u64 {
247        self.generation
248    }
249}
250
251/// A stateful struct remembering the cross-signing keys we need to upload.
252///
253/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
254/// additional authentication, this struct will contain information on the type
255/// of authentication the user needs to complete before the upload might be
256/// continued.
257///
258/// More info can be found in the [spec].
259///
260/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
261#[derive(Debug)]
262pub struct CrossSigningResetHandle {
263    client: Client,
264    upload_request: UploadSigningKeysRequest,
265    signatures_request: UploadSignaturesRequest,
266    auth_type: CrossSigningResetAuthType,
267    is_cancelled: Mutex<bool>,
268}
269
270impl CrossSigningResetHandle {
271    /// Set up a new `CrossSigningResetHandle`.
272    pub fn new(
273        client: Client,
274        upload_request: UploadSigningKeysRequest,
275        signatures_request: UploadSignaturesRequest,
276        auth_type: CrossSigningResetAuthType,
277    ) -> Self {
278        Self {
279            client,
280            upload_request,
281            signatures_request,
282            auth_type,
283            is_cancelled: Mutex::new(false),
284        }
285    }
286
287    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
288    /// is using.
289    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
290        &self.auth_type
291    }
292
293    /// Continue the cross-signing reset by either waiting for the
294    /// authentication to be done on the side of the OAuth 2.0 server or by
295    /// providing additional [`AuthData`] the homeserver requires.
296    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
297        let mut upload_request = self.upload_request.clone();
298        upload_request.auth = auth;
299
300        while let Err(e) = self.client.send(upload_request.clone()).await {
301            if *self.is_cancelled.lock().await {
302                return Ok(());
303            }
304
305            match e.as_uiaa_response() {
306                Some(uiaa_info) => {
307                    if uiaa_info.auth_error.is_some() {
308                        return Err(e.into());
309                    }
310                }
311                None => return Err(e.into()),
312            }
313        }
314
315        self.client.send(self.signatures_request.clone()).await?;
316
317        Ok(())
318    }
319
320    /// Cancel the ongoing identity reset process
321    pub async fn cancel(&self) {
322        *self.is_cancelled.lock().await = true;
323    }
324}
325
326/// information about the additional authentication that is required before the
327/// cross-signing keys can be uploaded.
328#[derive(Debug, Clone)]
329pub enum CrossSigningResetAuthType {
330    /// The homeserver requires user-interactive authentication.
331    Uiaa(UiaaInfo),
332    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
333    /// approve the upload of cross-signing keys.
334    OAuth(OAuthCrossSigningResetInfo),
335}
336
337impl CrossSigningResetAuthType {
338    fn new(error: &HttpError) -> Result<Option<Self>> {
339        if let Some(auth_info) = error.as_uiaa_response() {
340            if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
341                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
342            } else {
343                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
344            }
345        } else {
346            Ok(None)
347        }
348    }
349}
350
351/// OAuth 2.0 specific information about the required authentication for the
352/// upload of cross-signing keys.
353#[derive(Debug, Clone, Deserialize)]
354pub struct OAuthCrossSigningResetInfo {
355    /// The URL where the user can approve the reset of the cross-signing keys.
356    pub approval_url: Url,
357}
358
359impl OAuthCrossSigningResetInfo {
360    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
361        let parameters = serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(
362            auth_info.params.as_ref().map(|value| value.get()).unwrap_or_default(),
363        )?;
364
365        Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
366    }
367}
368
369/// The parsed `parameters` part of a [`ruma::api::client::uiaa::UiaaInfo`]
370/// response
371#[derive(Debug, Deserialize)]
372struct OAuthCrossSigningResetUiaaParameters {
373    /// The URL where the user can approve the reset of the cross-signing keys.
374    #[serde(rename = "org.matrix.cross_signing_reset")]
375    reset: OAuthCrossSigningResetUiaaResetParameter,
376}
377
378/// The `org.matrix.cross_signing_reset` part of the Uiaa response `parameters``
379/// dictionary.
380#[derive(Debug, Deserialize)]
381struct OAuthCrossSigningResetUiaaResetParameter {
382    /// The URL where the user can approve the reset of the cross-signing keys.
383    url: Url,
384}
385
386/// A struct that helps to parse the custom error message Synapse posts if a
387/// duplicate one-time key is uploaded.
388#[derive(Debug)]
389struct DuplicateOneTimeKeyErrorMessage {
390    /// The previously uploaded one-time key.
391    old_key: Curve25519PublicKey,
392    /// The one-time key we're attempting to upload right now.
393    new_key: Curve25519PublicKey,
394}
395
396impl FromStr for DuplicateOneTimeKeyErrorMessage {
397    type Err = serde_json::Error;
398
399    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
400        // First we split the string into two parts, the part containing the old key and
401        // the part containing the new key. The parts are conveniently separated
402        // by a `;` character.
403        let mut split = s.split_terminator(';');
404
405        let old_key = split
406            .next()
407            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
408        let new_key = split
409            .next()
410            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
411
412        // Now we remove the lengthy prefix from the part containing the old key, we
413        // should be left with just the JSON of the signed key.
414        let old_key_index = old_key
415            .find("Old key:")
416            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
417
418        let old_key = old_key[old_key_index..]
419            .trim()
420            .strip_prefix("Old key:")
421            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
422
423        // The part containing the new key is much simpler, we just remove a static
424        // prefix.
425        let new_key = new_key
426            .trim()
427            .strip_prefix("new key:")
428            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
429
430        // The JSON containing the new key is for some reason quoted using single
431        // quotes, so let's replace them with normal double quotes.
432        let new_key = new_key.replace("'", "\"");
433
434        // Let's deserialize now.
435        let old_key: SignedKey = serde_json::from_str(old_key)?;
436        let new_key: SignedKey = serde_json::from_str(&new_key)?;
437
438        // Pick out the Curve keys, we don't care about the rest that much.
439        let old_key = old_key.key();
440        let new_key = new_key.key();
441
442        Ok(Self { old_key, new_key })
443    }
444}
445
446impl Client {
447    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
448        self.base_client().olm_machine().await
449    }
450
451    pub(crate) async fn mark_request_as_sent(
452        &self,
453        request_id: &TransactionId,
454        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
455    ) -> Result<(), matrix_sdk_base::Error> {
456        Ok(self
457            .olm_machine()
458            .await
459            .as_ref()
460            .expect(
461                "We should have an olm machine once we try to mark E2EE related requests as sent",
462            )
463            .mark_request_as_sent(request_id, response)
464            .await?)
465    }
466
467    /// Query the server for users device keys.
468    ///
469    /// # Panics
470    ///
471    /// Panics if no key query needs to be done.
472    #[instrument(skip(self, device_keys))]
473    pub(crate) async fn keys_query(
474        &self,
475        request_id: &TransactionId,
476        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
477    ) -> Result<get_keys::v3::Response> {
478        let request = assign!(get_keys::v3::Request::new(), { device_keys });
479
480        let response = self.send(request).await?;
481        self.mark_request_as_sent(request_id, &response).await?;
482        self.encryption().update_state_after_keys_query(&response).await;
483
484        Ok(response)
485    }
486
487    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
488    /// encrypting and uploading a provided reader.
489    ///
490    /// # Arguments
491    ///
492    /// * `content_type` - The content type of the file.
493    /// * `reader` - The reader that should be encrypted and uploaded.
494    ///
495    /// # Examples
496    ///
497    /// ```no_run
498    /// # use matrix_sdk::Client;
499    /// # use url::Url;
500    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
501    /// use serde::{Deserialize, Serialize};
502    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
503    ///
504    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
505    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
506    /// struct CustomEventContent {
507    ///     encrypted_file: EncryptedFile,
508    /// }
509    ///
510    /// # async {
511    /// # let homeserver = Url::parse("http://example.com")?;
512    /// # let client = Client::new(homeserver).await?;
513    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
514    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
515    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
516    ///
517    /// room.send(CustomEventContent { encrypted_file }).await?;
518    /// # anyhow::Ok(()) };
519    /// ```
520    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
521        &'a self,
522        reader: &'a mut R,
523    ) -> UploadEncryptedFile<'a, R> {
524        UploadEncryptedFile::new(self, reader)
525    }
526
527    /// Encrypt and upload the file and thumbnails, and return the source
528    /// information.
529    pub(crate) async fn upload_encrypted_media_and_thumbnail(
530        &self,
531        data: &[u8],
532        thumbnail: Option<Thumbnail>,
533        send_progress: SharedObservable<TransmissionProgress>,
534    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
535        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
536
537        let upload_attachment = async {
538            let mut cursor = Cursor::new(data);
539            self.upload_encrypted_file(&mut cursor)
540                .with_send_progress_observable(send_progress)
541                .await
542        };
543
544        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
545
546        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
547    }
548
549    /// Uploads an encrypted thumbnail to the media repository, and returns
550    /// its source and extra information.
551    async fn upload_encrypted_thumbnail(
552        &self,
553        thumbnail: Option<Thumbnail>,
554        send_progress: SharedObservable<TransmissionProgress>,
555    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
556        let Some(thumbnail) = thumbnail else {
557            return Ok(None);
558        };
559
560        let (data, _, thumbnail_info) = thumbnail.into_parts();
561        let mut cursor = Cursor::new(data);
562
563        let file = self
564            .upload_encrypted_file(&mut cursor)
565            .with_send_progress_observable(send_progress)
566            .await?;
567
568        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
569    }
570
571    /// Claim one-time keys creating new Olm sessions.
572    ///
573    /// # Arguments
574    ///
575    /// * `users` - The list of user/device pairs that we should claim keys for.
576    pub(crate) async fn claim_one_time_keys(
577        &self,
578        users: impl Iterator<Item = &UserId>,
579    ) -> Result<()> {
580        let _lock = self.locks().key_claim_lock.lock().await;
581
582        if let Some((request_id, request)) = self
583            .olm_machine()
584            .await
585            .as_ref()
586            .ok_or(Error::NoOlmMachine)?
587            .get_missing_sessions(users)
588            .await?
589        {
590            let response = self.send(request).await?;
591            self.mark_request_as_sent(&request_id, &response).await?;
592        }
593
594        Ok(())
595    }
596
597    /// Upload the E2E encryption keys.
598    ///
599    /// This uploads the long lived device keys as well as the required amount
600    /// of one-time keys.
601    ///
602    /// # Panics
603    ///
604    /// Panics if the client isn't logged in, or if no encryption keys need to
605    /// be uploaded.
606    #[instrument(skip(self, request))]
607    pub(crate) async fn keys_upload(
608        &self,
609        request_id: &TransactionId,
610        request: &upload_keys::v3::Request,
611    ) -> Result<upload_keys::v3::Response> {
612        debug!(
613            device_keys = request.device_keys.is_some(),
614            one_time_key_count = request.one_time_keys.len(),
615            "Uploading public encryption keys",
616        );
617
618        let response = self.send(request.clone()).await?;
619        self.mark_request_as_sent(request_id, &response).await?;
620
621        Ok(response)
622    }
623
624    pub(crate) async fn room_send_helper(
625        &self,
626        request: &RoomMessageRequest,
627    ) -> Result<send_message_event::v3::Response> {
628        let content = request.content.clone();
629        let txn_id = request.txn_id.clone();
630        let room_id = &request.room_id;
631
632        self.get_room(room_id)
633            .expect("Can't send a message to a room that isn't known to the store")
634            .send(*content)
635            .with_transaction_id(txn_id)
636            .await
637    }
638
639    pub(crate) async fn send_to_device(
640        &self,
641        request: &ToDeviceRequest,
642    ) -> HttpResult<ToDeviceResponse> {
643        let request = RumaToDeviceRequest::new_raw(
644            request.event_type.clone(),
645            request.txn_id.clone(),
646            request.messages.clone(),
647        );
648
649        self.send(request).await
650    }
651
652    pub(crate) async fn send_verification_request(
653        &self,
654        request: OutgoingVerificationRequest,
655    ) -> Result<()> {
656        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
657
658        match request {
659            ToDevice(t) => {
660                self.send_to_device(&t).await?;
661            }
662            InRoom(r) => {
663                self.room_send_helper(&r).await?;
664            }
665        }
666
667        Ok(())
668    }
669
670    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
671        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
672
673        match r.request() {
674            AnyOutgoingRequest::KeysQuery(request) => {
675                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
676            }
677            AnyOutgoingRequest::KeysUpload(request) => {
678                let response = self.keys_upload(r.request_id(), request).await;
679
680                if let Err(e) = &response {
681                    match e.as_ruma_api_error() {
682                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
683                            if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
684                            {
685                                // This is one of the nastiest errors we can have. The server
686                                // telling us that we already have a one-time key uploaded means
687                                // that we forgot about some of our one-time keys. This will lead to
688                                // UTDs.
689                                {
690                                    let already_reported = self
691                                        .state_store()
692                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
693                                        .await?
694                                        .is_some();
695
696                                    if message.starts_with("One time key") && !already_reported {
697                                        if let Ok(message) =
698                                            DuplicateOneTimeKeyErrorMessage::from_str(message)
699                                        {
700                                            error!(
701                                                sentry = true,
702                                                old_key = %message.old_key,
703                                                new_key = %message.new_key,
704                                                "Duplicate one-time keys have been uploaded"
705                                            );
706                                        } else {
707                                            error!(
708                                                sentry = true,
709                                                "Duplicate one-time keys have been uploaded"
710                                            );
711                                        }
712
713                                        self.state_store()
714                                            .set_kv_data(
715                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
716                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
717                                            )
718                                            .await?;
719                                    }
720                                }
721                            }
722                        }
723                        _ => {}
724                    }
725
726                    response?;
727                }
728            }
729            AnyOutgoingRequest::ToDeviceRequest(request) => {
730                let response = self.send_to_device(request).await?;
731                self.mark_request_as_sent(r.request_id(), &response).await?;
732            }
733            AnyOutgoingRequest::SignatureUpload(request) => {
734                let response = self.send(request.clone()).await?;
735                self.mark_request_as_sent(r.request_id(), &response).await?;
736            }
737            AnyOutgoingRequest::RoomMessage(request) => {
738                let response = self.room_send_helper(request).await?;
739                self.mark_request_as_sent(r.request_id(), &response).await?;
740            }
741            AnyOutgoingRequest::KeysClaim(request) => {
742                let response = self.send(request.clone()).await?;
743                self.mark_request_as_sent(r.request_id(), &response).await?;
744            }
745        }
746
747        Ok(())
748    }
749
750    #[instrument(skip_all)]
751    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
752        const MAX_CONCURRENT_REQUESTS: usize = 20;
753
754        // This is needed because sometimes we need to automatically
755        // claim some one-time keys to unwedge an existing Olm session.
756        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
757            warn!("Error while claiming one-time keys {:?}", e);
758        }
759
760        let outgoing_requests = stream::iter(
761            self.olm_machine()
762                .await
763                .as_ref()
764                .ok_or(Error::NoOlmMachine)?
765                .outgoing_requests()
766                .await?,
767        )
768        .map(|r| self.send_outgoing_request(r));
769
770        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
771
772        requests
773            .for_each(|r| async move {
774                match r {
775                    Ok(_) => (),
776                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
777                }
778            })
779            .await;
780
781        Ok(())
782    }
783}
784
785#[cfg(any(feature = "testing", test))]
786impl Client {
787    /// Get the olm machine, for testing purposes only.
788    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
789        self.olm_machine().await
790    }
791}
792
793/// A high-level API to manage the client's encryption.
794///
795/// To get this, use [`Client::encryption()`].
796#[derive(Debug, Clone)]
797pub struct Encryption {
798    /// The underlying client.
799    client: Client,
800}
801
802impl Encryption {
803    pub(crate) fn new(client: Client) -> Self {
804        Self { client }
805    }
806
807    /// Returns the current encryption settings for this client.
808    pub(crate) fn settings(&self) -> EncryptionSettings {
809        self.client.inner.e2ee.encryption_settings
810    }
811
812    /// Get the public ed25519 key of our own device. This is usually what is
813    /// called the fingerprint of the device.
814    pub async fn ed25519_key(&self) -> Option<String> {
815        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
816    }
817
818    /// Get the public Curve25519 key of our own device.
819    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
820        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
821    }
822
823    /// Get the current device creation timestamp.
824    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
825        match self.get_own_device().await {
826            Ok(Some(device)) => device.first_time_seen_ts(),
827            // Should not happen, there should always be an own device
828            _ => MilliSecondsSinceUnixEpoch::now(),
829        }
830    }
831
832    pub(crate) async fn import_secrets_bundle(
833        &self,
834        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
835    ) -> Result<(), SecretImportError> {
836        let olm_machine = self.client.olm_machine().await;
837        let olm_machine =
838            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
839
840        olm_machine.store().import_secrets_bundle(bundle).await
841    }
842
843    /// Get the status of the private cross signing keys.
844    ///
845    /// This can be used to check which private cross signing keys we have
846    /// stored locally.
847    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
848        let olm = self.client.olm_machine().await;
849        let machine = olm.as_ref()?;
850        Some(machine.cross_signing_status().await)
851    }
852
853    /// Does the user have other devices that the current device can verify
854    /// against?
855    ///
856    /// The device must be signed by the user's cross-signing key, must have an
857    /// identity, and must not be a dehydrated device.
858    pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
859        let olm_machine = self.client.olm_machine().await;
860        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
861        let user_id = olm_machine.user_id();
862
863        self.ensure_initial_key_query().await?;
864
865        let devices = self.get_user_devices(user_id).await?;
866
867        let ret = devices.devices().any(|device| {
868            device.is_cross_signed_by_owner()
869                && device.curve25519_key().is_some()
870                && !device.is_dehydrated()
871        });
872
873        Ok(ret)
874    }
875
876    /// Get all the tracked users we know about
877    ///
878    /// Tracked users are users for which we keep the device list of E2EE
879    /// capable devices up to date.
880    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
881        if let Some(machine) = self.client.olm_machine().await.as_ref() {
882            machine.tracked_users().await
883        } else {
884            Ok(HashSet::new())
885        }
886    }
887
888    /// Get a [`Subscriber`] for the [`VerificationState`].
889    ///
890    /// # Examples
891    ///
892    /// ```no_run
893    /// use matrix_sdk::{Client, encryption};
894    /// use url::Url;
895    ///
896    /// # async {
897    /// let homeserver = Url::parse("http://example.com")?;
898    /// let client = Client::new(homeserver).await?;
899    /// let mut subscriber = client.encryption().verification_state();
900    ///
901    /// let current_value = subscriber.get();
902    ///
903    /// println!("The current verification state is: {current_value:?}");
904    ///
905    /// if let Some(verification_state) = subscriber.next().await {
906    ///     println!("Received verification state update {:?}", verification_state)
907    /// }
908    /// # anyhow::Ok(()) };
909    /// ```
910    pub fn verification_state(&self) -> Subscriber<VerificationState> {
911        self.client.inner.verification_state.subscribe_reset()
912    }
913
914    /// Get a verification object with the given flow id.
915    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
916        let olm = self.client.olm_machine().await;
917        let olm = olm.as_ref()?;
918        #[allow(clippy::bind_instead_of_map)]
919        olm.get_verification(user_id, flow_id).and_then(|v| match v {
920            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
921                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
922            }
923            #[cfg(feature = "qrcode")]
924            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
925                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
926            }
927            _ => None,
928        })
929    }
930
931    /// Get a `VerificationRequest` object for the given user with the given
932    /// flow id.
933    pub async fn get_verification_request(
934        &self,
935        user_id: &UserId,
936        flow_id: impl AsRef<str>,
937    ) -> Option<VerificationRequest> {
938        let olm = self.client.olm_machine().await;
939        let olm = olm.as_ref()?;
940
941        olm.get_verification_request(user_id, flow_id)
942            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
943    }
944
945    /// Get a specific device of a user.
946    ///
947    /// # Arguments
948    ///
949    /// * `user_id` - The unique id of the user that the device belongs to.
950    ///
951    /// * `device_id` - The unique id of the device.
952    ///
953    /// Returns a `Device` if one is found and the crypto store didn't throw an
954    /// error.
955    ///
956    /// This will always return None if the client hasn't been logged in.
957    ///
958    /// # Examples
959    ///
960    /// ```no_run
961    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
962    /// # use url::Url;
963    /// # async {
964    /// # let alice = user_id!("@alice:example.org");
965    /// # let homeserver = Url::parse("http://example.com")?;
966    /// # let client = Client::new(homeserver).await?;
967    /// if let Some(device) =
968    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
969    /// {
970    ///     println!("{:?}", device.is_verified());
971    ///
972    ///     if !device.is_verified() {
973    ///         let verification = device.request_verification().await?;
974    ///     }
975    /// }
976    /// # anyhow::Ok(()) };
977    /// ```
978    pub async fn get_device(
979        &self,
980        user_id: &UserId,
981        device_id: &DeviceId,
982    ) -> Result<Option<Device>, CryptoStoreError> {
983        let olm = self.client.olm_machine().await;
984        let Some(machine) = olm.as_ref() else { return Ok(None) };
985        let device = machine.get_device(user_id, device_id, None).await?;
986        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
987    }
988
989    /// A convenience method to retrieve your own device from the store.
990    ///
991    /// This is the same as calling [`Encryption::get_device()`] with your own
992    /// user and device ID.
993    ///
994    /// This will always return a device, unless you are not logged in.
995    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
996        let olm = self.client.olm_machine().await;
997        let Some(machine) = olm.as_ref() else { return Ok(None) };
998        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
999        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1000    }
1001
1002    /// Get a map holding all the devices of an user.
1003    ///
1004    /// This will always return an empty map if the client hasn't been logged
1005    /// in.
1006    ///
1007    /// # Arguments
1008    ///
1009    /// * `user_id` - The unique id of the user that the devices belong to.
1010    ///
1011    /// # Examples
1012    ///
1013    /// ```no_run
1014    /// # use matrix_sdk::{Client, ruma::user_id};
1015    /// # use url::Url;
1016    /// # async {
1017    /// # let alice = user_id!("@alice:example.org");
1018    /// # let homeserver = Url::parse("http://example.com")?;
1019    /// # let client = Client::new(homeserver).await?;
1020    /// let devices = client.encryption().get_user_devices(alice).await?;
1021    ///
1022    /// for device in devices.devices() {
1023    ///     println!("{device:?}");
1024    /// }
1025    /// # anyhow::Ok(()) };
1026    /// ```
1027    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1028        let devices = self
1029            .client
1030            .olm_machine()
1031            .await
1032            .as_ref()
1033            .ok_or(Error::NoOlmMachine)?
1034            .get_user_devices(user_id, None)
1035            .await?;
1036
1037        Ok(UserDevices { inner: devices, client: self.client.clone() })
1038    }
1039
1040    /// Get the E2EE identity of a user from the crypto store.
1041    ///
1042    /// Usually, we only have the E2EE identity of a user locally if the user
1043    /// is tracked, meaning that we are both members of the same encrypted room.
1044    ///
1045    /// To get the E2EE identity of a user even if it is not available locally
1046    /// use [`Encryption::request_user_identity()`].
1047    ///
1048    /// # Arguments
1049    ///
1050    /// * `user_id` - The unique id of the user that the identity belongs to.
1051    ///
1052    /// Returns a `UserIdentity` if one is found and the crypto store
1053    /// didn't throw an error.
1054    ///
1055    /// This will always return None if the client hasn't been logged in.
1056    ///
1057    /// # Examples
1058    ///
1059    /// ```no_run
1060    /// # use matrix_sdk::{Client, ruma::user_id};
1061    /// # use url::Url;
1062    /// # async {
1063    /// # let alice = user_id!("@alice:example.org");
1064    /// # let homeserver = Url::parse("http://example.com")?;
1065    /// # let client = Client::new(homeserver).await?;
1066    /// let user = client.encryption().get_user_identity(alice).await?;
1067    ///
1068    /// if let Some(user) = user {
1069    ///     println!("{:?}", user.is_verified());
1070    ///
1071    ///     let verification = user.request_verification().await?;
1072    /// }
1073    /// # anyhow::Ok(()) };
1074    /// ```
1075    pub async fn get_user_identity(
1076        &self,
1077        user_id: &UserId,
1078    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1079        let olm = self.client.olm_machine().await;
1080        let Some(olm) = olm.as_ref() else { return Ok(None) };
1081        let identity = olm.get_identity(user_id, None).await?;
1082
1083        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1084    }
1085
1086    /// Get the E2EE identity of a user from the homeserver.
1087    ///
1088    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1089    /// E2EE identity is not found, it should mean that the user did not set
1090    /// up cross-signing.
1091    ///
1092    /// If you want the E2EE identity of a user without making a request to the
1093    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1094    ///
1095    /// # Arguments
1096    ///
1097    /// * `user_id` - The ID of the user that the identity belongs to.
1098    ///
1099    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1100    /// was an issue with the crypto store or with the request to the
1101    /// homeserver.
1102    ///
1103    /// This will always return `None` if the client hasn't been logged in.
1104    ///
1105    /// # Examples
1106    ///
1107    /// ```no_run
1108    /// # use matrix_sdk::{Client, ruma::user_id};
1109    /// # use url::Url;
1110    /// # async {
1111    /// # let alice = user_id!("@alice:example.org");
1112    /// # let homeserver = Url::parse("http://example.com")?;
1113    /// # let client = Client::new(homeserver).await?;
1114    /// let user = client.encryption().request_user_identity(alice).await?;
1115    ///
1116    /// if let Some(user) = user {
1117    ///     println!("User is verified: {:?}", user.is_verified());
1118    ///
1119    ///     let verification = user.request_verification().await?;
1120    /// }
1121    /// # anyhow::Ok(()) };
1122    /// ```
1123    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1124        let olm = self.client.olm_machine().await;
1125        let Some(olm) = olm.as_ref() else { return Ok(None) };
1126
1127        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1128        self.client.keys_query(&request_id, request.device_keys).await?;
1129
1130        let identity = olm.get_identity(user_id, None).await?;
1131        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1132    }
1133
1134    /// Returns a stream of device updates, allowing users to listen for
1135    /// notifications about new or changed devices.
1136    ///
1137    /// The stream produced by this method emits updates whenever a new device
1138    /// is discovered or when an existing device's information is changed. Users
1139    /// can subscribe to this stream and receive updates in real-time.
1140    ///
1141    /// # Examples
1142    ///
1143    /// ```no_run
1144    /// # use matrix_sdk::Client;
1145    /// # use ruma::{device_id, user_id};
1146    /// # use futures_util::{pin_mut, StreamExt};
1147    /// # let client: Client = unimplemented!();
1148    /// # async {
1149    /// let devices_stream = client.encryption().devices_stream().await?;
1150    /// let user_id = client
1151    ///     .user_id()
1152    ///     .expect("We should know our user id after we have logged in");
1153    /// pin_mut!(devices_stream);
1154    ///
1155    /// for device_updates in devices_stream.next().await {
1156    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1157    ///         for device in user_devices.values() {
1158    ///             println!("A new device has been added {}", device.device_id());
1159    ///         }
1160    ///     }
1161    /// }
1162    /// # anyhow::Ok(()) };
1163    /// ```
1164    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1165        let olm = self.client.olm_machine().await;
1166        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1167        let client = self.client.to_owned();
1168
1169        Ok(olm
1170            .store()
1171            .devices_stream()
1172            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1173    }
1174
1175    /// Returns a stream of user identity updates, allowing users to listen for
1176    /// notifications about new or changed user identities.
1177    ///
1178    /// The stream produced by this method emits updates whenever a new user
1179    /// identity is discovered or when an existing identities information is
1180    /// changed. Users can subscribe to this stream and receive updates in
1181    /// real-time.
1182    ///
1183    /// # Examples
1184    ///
1185    /// ```no_run
1186    /// # use matrix_sdk::Client;
1187    /// # use ruma::{device_id, user_id};
1188    /// # use futures_util::{pin_mut, StreamExt};
1189    /// # let client: Client = unimplemented!();
1190    /// # async {
1191    /// let identities_stream =
1192    ///     client.encryption().user_identities_stream().await?;
1193    /// pin_mut!(identities_stream);
1194    ///
1195    /// for identity_updates in identities_stream.next().await {
1196    ///     for (_, identity) in identity_updates.new {
1197    ///         println!("A new identity has been added {}", identity.user_id());
1198    ///     }
1199    /// }
1200    /// # anyhow::Ok(()) };
1201    /// ```
1202    pub async fn user_identities_stream(
1203        &self,
1204    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1205        let olm = self.client.olm_machine().await;
1206        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1207        let client = self.client.to_owned();
1208
1209        Ok(olm
1210            .store()
1211            .user_identities_stream()
1212            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1213    }
1214
1215    /// Create and upload a new cross signing identity.
1216    ///
1217    /// # Arguments
1218    ///
1219    /// * `auth_data` - This request requires user interactive auth, the first
1220    ///   request needs to set this to `None` and will always fail with an
1221    ///   `UiaaResponse`. The response will contain information for the
1222    ///   interactive auth and the same request needs to be made but this time
1223    ///   with some `auth_data` provided.
1224    ///
1225    /// # Examples
1226    ///
1227    /// ```no_run
1228    /// # use std::collections::BTreeMap;
1229    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1230    /// # use url::Url;
1231    /// # use serde_json::json;
1232    /// # async {
1233    /// # let homeserver = Url::parse("http://example.com")?;
1234    /// # let client = Client::new(homeserver).await?;
1235    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1236    ///     if let Some(response) = e.as_uiaa_response() {
1237    ///         let mut password = uiaa::Password::new(
1238    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1239    ///             "wordpass".to_owned(),
1240    ///         );
1241    ///         password.session = response.session.clone();
1242    ///
1243    ///         client
1244    ///             .encryption()
1245    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1246    ///             .await
1247    ///             .expect("Couldn't bootstrap cross signing")
1248    ///     } else {
1249    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1250    ///     }
1251    /// }
1252    /// # anyhow::Ok(()) };
1253    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1254        let olm = self.client.olm_machine().await;
1255        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1256
1257        let CrossSigningBootstrapRequests {
1258            upload_signing_keys_req,
1259            upload_keys_req,
1260            upload_signatures_req,
1261        } = olm.bootstrap_cross_signing(false).await?;
1262
1263        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1264            auth: auth_data,
1265            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1266            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1267            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1268        });
1269
1270        if let Some(req) = upload_keys_req {
1271            self.client.send_outgoing_request(req).await?;
1272        }
1273        self.client.send(upload_signing_keys_req).await?;
1274        self.client.send(upload_signatures_req).await?;
1275
1276        Ok(())
1277    }
1278
1279    /// Reset the cross-signing keys.
1280    ///
1281    /// # Example
1282    ///
1283    /// ```no_run
1284    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1285    /// # use url::Url;
1286    /// # async {
1287    /// # let homeserver = Url::parse("http://example.com")?;
1288    /// # let client = Client::new(homeserver).await?;
1289    /// # let user_id = unimplemented!();
1290    /// let encryption = client.encryption();
1291    ///
1292    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1293    ///     match handle.auth_type() {
1294    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1295    ///             use matrix_sdk::ruma::api::client::uiaa;
1296    ///
1297    ///             let password = "1234".to_owned();
1298    ///             let mut password = uiaa::Password::new(user_id, password);
1299    ///             password.session = uiaa.session;
1300    ///
1301    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1302    ///         }
1303    ///         CrossSigningResetAuthType::OAuth(o) => {
1304    ///             println!(
1305    ///                 "To reset your end-to-end encryption cross-signing identity, \
1306    ///                 you first need to approve it at {}",
1307    ///                 o.approval_url
1308    ///             );
1309    ///             handle.auth(None).await?;
1310    ///         }
1311    ///     }
1312    /// }
1313    /// # anyhow::Ok(()) };
1314    /// ```
1315    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1316        let olm = self.client.olm_machine().await;
1317        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1318
1319        let CrossSigningBootstrapRequests {
1320            upload_keys_req,
1321            upload_signing_keys_req,
1322            upload_signatures_req,
1323        } = olm.bootstrap_cross_signing(true).await?;
1324
1325        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1326            auth: None,
1327            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1328            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1329            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1330        });
1331
1332        if let Some(req) = upload_keys_req {
1333            self.client.send_outgoing_request(req).await?;
1334        }
1335
1336        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1337            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1338                let client = self.client.clone();
1339
1340                Ok(Some(CrossSigningResetHandle::new(
1341                    client,
1342                    upload_signing_keys_req,
1343                    upload_signatures_req,
1344                    auth_type,
1345                )))
1346            } else {
1347                Err(error.into())
1348            }
1349        } else {
1350            self.client.send(upload_signatures_req).await?;
1351
1352            Ok(None)
1353        }
1354    }
1355
1356    /// Query the user's own device keys, if, and only if, we didn't have their
1357    /// identity in the first place.
1358    async fn ensure_initial_key_query(&self) -> Result<()> {
1359        let olm_machine = self.client.olm_machine().await;
1360        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1361
1362        let user_id = olm_machine.user_id();
1363
1364        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1365            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1366            self.client.keys_query(&request_id, request.device_keys).await?;
1367        }
1368
1369        Ok(())
1370    }
1371
1372    /// Create and upload a new cross signing identity, if that has not been
1373    /// done yet.
1374    ///
1375    /// This will only create a new cross-signing identity if the user had never
1376    /// done it before. If the user did it before, then this is a no-op.
1377    ///
1378    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1379    /// behavior of this function.
1380    ///
1381    /// # Arguments
1382    ///
1383    /// * `auth_data` - This request requires user interactive auth, the first
1384    ///   request needs to set this to `None` and will always fail with an
1385    ///   `UiaaResponse`. The response will contain information for the
1386    ///   interactive auth and the same request needs to be made but this time
1387    ///   with some `auth_data` provided.
1388    ///
1389    /// # Examples
1390    /// ```no_run
1391    /// # use std::collections::BTreeMap;
1392    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1393    /// # use url::Url;
1394    /// # use serde_json::json;
1395    /// # async {
1396    /// # let homeserver = Url::parse("http://example.com")?;
1397    /// # let client = Client::new(homeserver).await?;
1398    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1399    ///     if let Some(response) = e.as_uiaa_response() {
1400    ///         let mut password = uiaa::Password::new(
1401    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1402    ///             "wordpass".to_owned(),
1403    ///         );
1404    ///         password.session = response.session.clone();
1405    ///
1406    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1407    ///         // avoid checks.
1408    ///         client
1409    ///             .encryption()
1410    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1411    ///             .await
1412    ///             .expect("Couldn't bootstrap cross signing")
1413    ///     } else {
1414    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1415    ///     }
1416    /// }
1417    /// # anyhow::Ok(()) };
1418    pub async fn bootstrap_cross_signing_if_needed(
1419        &self,
1420        auth_data: Option<AuthData>,
1421    ) -> Result<()> {
1422        let olm_machine = self.client.olm_machine().await;
1423        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1424        let user_id = olm_machine.user_id();
1425
1426        self.ensure_initial_key_query().await?;
1427
1428        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1429            self.bootstrap_cross_signing(auth_data).await?;
1430        }
1431
1432        Ok(())
1433    }
1434
1435    /// Export E2EE keys that match the given predicate encrypting them with the
1436    /// given passphrase.
1437    ///
1438    /// # Arguments
1439    ///
1440    /// * `path` - The file path where the exported key file will be saved.
1441    ///
1442    /// * `passphrase` - The passphrase that will be used to encrypt the
1443    ///   exported room keys.
1444    ///
1445    /// * `predicate` - A closure that will be called for every known
1446    ///   `InboundGroupSession`, which represents a room key. If the closure
1447    ///   returns `true` the `InboundGroupSessoin` will be included in the
1448    ///   export, if the closure returns `false` it will not be included.
1449    ///
1450    /// # Panics
1451    ///
1452    /// This method will panic if it isn't run on a Tokio runtime.
1453    ///
1454    /// This method will panic if it can't get enough randomness from the OS to
1455    /// encrypt the exported keys securely.
1456    ///
1457    /// # Examples
1458    ///
1459    /// ```no_run
1460    /// # use std::{path::PathBuf, time::Duration};
1461    /// # use matrix_sdk::{
1462    /// #     Client, config::SyncSettings,
1463    /// #     ruma::room_id,
1464    /// # };
1465    /// # use url::Url;
1466    /// # async {
1467    /// # let homeserver = Url::parse("http://localhost:8080")?;
1468    /// # let mut client = Client::new(homeserver).await?;
1469    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1470    /// // Export all room keys.
1471    /// client
1472    ///     .encryption()
1473    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1474    ///     .await?;
1475    ///
1476    /// // Export only the room keys for a certain room.
1477    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1478    /// let room_id = room_id!("!test:localhost");
1479    ///
1480    /// client
1481    ///     .encryption()
1482    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1483    ///     .await?;
1484    /// # anyhow::Ok(()) };
1485    /// ```
1486    #[cfg(not(target_family = "wasm"))]
1487    pub async fn export_room_keys(
1488        &self,
1489        path: PathBuf,
1490        passphrase: &str,
1491        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1492    ) -> Result<()> {
1493        let olm = self.client.olm_machine().await;
1494        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1495
1496        let keys = olm.store().export_room_keys(predicate).await?;
1497        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1498
1499        let encrypt = move || -> Result<()> {
1500            let export: String =
1501                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1502            let mut file = std::fs::File::create(path)?;
1503            file.write_all(&export.into_bytes())?;
1504            Ok(())
1505        };
1506
1507        let task = tokio::task::spawn_blocking(encrypt);
1508        task.await.expect("Task join error")
1509    }
1510
1511    /// Import E2EE keys from the given file path.
1512    ///
1513    /// # Arguments
1514    ///
1515    /// * `path` - The file path where the exported key file will can be found.
1516    ///
1517    /// * `passphrase` - The passphrase that should be used to decrypt the
1518    ///   exported room keys.
1519    ///
1520    /// Returns a tuple of numbers that represent the number of sessions that
1521    /// were imported and the total number of sessions that were found in the
1522    /// key export.
1523    ///
1524    /// # Panics
1525    ///
1526    /// This method will panic if it isn't run on a Tokio runtime.
1527    ///
1528    /// ```no_run
1529    /// # use std::{path::PathBuf, time::Duration};
1530    /// # use matrix_sdk::{
1531    /// #     Client, config::SyncSettings,
1532    /// #     ruma::room_id,
1533    /// # };
1534    /// # use url::Url;
1535    /// # async {
1536    /// # let homeserver = Url::parse("http://localhost:8080")?;
1537    /// # let mut client = Client::new(homeserver).await?;
1538    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1539    /// let result =
1540    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1541    ///
1542    /// println!(
1543    ///     "Imported {} room keys out of {}",
1544    ///     result.imported_count, result.total_count
1545    /// );
1546    /// # anyhow::Ok(()) };
1547    /// ```
1548    #[cfg(not(target_family = "wasm"))]
1549    pub async fn import_room_keys(
1550        &self,
1551        path: PathBuf,
1552        passphrase: &str,
1553    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1554        let olm = self.client.olm_machine().await;
1555        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1556        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1557
1558        let decrypt = move || {
1559            let file = std::fs::File::open(path)?;
1560            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1561        };
1562
1563        let task = tokio::task::spawn_blocking(decrypt);
1564        let import = task.await.expect("Task join error")?;
1565
1566        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1567
1568        self.backups().maybe_trigger_backup();
1569
1570        Ok(ret)
1571    }
1572
1573    /// Receive notifications of room keys being received as a [`Stream`].
1574    ///
1575    /// Each time a room key is updated in any way, an update will be sent to
1576    /// the stream. Updates that happen at the same time are batched into a
1577    /// [`Vec`].
1578    ///
1579    /// If the reader of the stream lags too far behind, an error is broadcast
1580    /// containing the number of skipped items.
1581    ///
1582    /// # Examples
1583    ///
1584    /// ```no_run
1585    /// # use matrix_sdk::Client;
1586    /// # use url::Url;
1587    /// # async {
1588    /// # let homeserver = Url::parse("http://example.com")?;
1589    /// # let client = Client::new(homeserver).await?;
1590    /// use futures_util::StreamExt;
1591    ///
1592    /// let Some(mut room_keys_stream) =
1593    ///     client.encryption().room_keys_received_stream().await
1594    /// else {
1595    ///     return Ok(());
1596    /// };
1597    ///
1598    /// while let Some(update) = room_keys_stream.next().await {
1599    ///     println!("Received room keys {update:?}");
1600    /// }
1601    /// # anyhow::Ok(()) };
1602    /// ```
1603    pub async fn room_keys_received_stream(
1604        &self,
1605    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1606    {
1607        let olm = self.client.olm_machine().await;
1608        let olm = olm.as_ref()?;
1609
1610        Some(olm.store().room_keys_received_stream())
1611    }
1612
1613    /// Receive notifications of historic room key bundles as a [`Stream`].
1614    ///
1615    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1616    ///
1617    /// Each time a historic room key bundle was received, an update will be
1618    /// sent to the stream. This stream is useful for informative purposes
1619    /// exclusively, historic room key bundles are handled by the SDK
1620    /// automatically.
1621    ///
1622    /// # Examples
1623    ///
1624    /// ```no_run
1625    /// # use matrix_sdk::Client;
1626    /// # use url::Url;
1627    /// # async {
1628    /// # let homeserver = Url::parse("http://example.com")?;
1629    /// # let client = Client::new(homeserver).await?;
1630    /// use futures_util::StreamExt;
1631    ///
1632    /// let Some(mut bundle_stream) =
1633    ///     client.encryption().historic_room_key_stream().await
1634    /// else {
1635    ///     return Ok(());
1636    /// };
1637    ///
1638    /// while let Some(bundle_info) = bundle_stream.next().await {
1639    ///     println!("Received a historic room key bundle {bundle_info:?}");
1640    /// }
1641    /// # anyhow::Ok(()) };
1642    /// ```
1643    pub async fn historic_room_key_stream(
1644        &self,
1645    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1646        let olm = self.client.olm_machine().await;
1647        let olm = olm.as_ref()?;
1648
1649        Some(olm.store().historic_room_key_stream())
1650    }
1651
1652    /// Get the secret storage manager of the client.
1653    pub fn secret_storage(&self) -> SecretStorage {
1654        SecretStorage { client: self.client.to_owned() }
1655    }
1656
1657    /// Get the backups manager of the client.
1658    pub fn backups(&self) -> Backups {
1659        Backups { client: self.client.to_owned() }
1660    }
1661
1662    /// Get the recovery manager of the client.
1663    pub fn recovery(&self) -> Recovery {
1664        Recovery { client: self.client.to_owned() }
1665    }
1666
1667    /// Enables the crypto-store cross-process lock.
1668    ///
1669    /// This may be required if there are multiple processes that may do writes
1670    /// to the same crypto store. In that case, it's necessary to create a
1671    /// lock, so that only one process writes to it, otherwise this may
1672    /// cause confusing issues because of stale data contained in in-memory
1673    /// caches.
1674    ///
1675    /// The provided `lock_value` must be a unique identifier for this process.
1676    /// Check [`Client::cross_process_store_locks_holder_name`] to
1677    /// get the global value.
1678    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1679        // If the lock has already been created, don't recreate it from scratch.
1680        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1681            let prev_holder = prev_lock.lock_holder();
1682            if prev_holder == lock_value {
1683                return Ok(());
1684            }
1685            warn!(
1686                "Recreating cross-process store lock with a different holder value: \
1687                 prev was {prev_holder}, new is {lock_value}"
1688            );
1689        }
1690
1691        let olm_machine = self.client.base_client().olm_machine().await;
1692        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1693
1694        let lock =
1695            olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1696
1697        // Gently try to initialize the crypto store generation counter.
1698        //
1699        // If we don't get the lock immediately, then it is already acquired by another
1700        // process, and we'll get to reload next time we acquire the lock.
1701        {
1702            let lock_result = lock.try_lock_once().await?;
1703
1704            if lock_result.is_ok() {
1705                olm_machine
1706                    .initialize_crypto_store_generation(
1707                        &self.client.locks().crypto_store_generation,
1708                    )
1709                    .await?;
1710            }
1711        }
1712
1713        self.client
1714            .locks()
1715            .cross_process_crypto_store_lock
1716            .set(lock)
1717            .map_err(|_| Error::BadCryptoStoreState)?;
1718
1719        Ok(())
1720    }
1721
1722    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1723    /// time.
1724    ///
1725    /// Returns the current generation number.
1726    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1727        let olm_machine_guard = self.client.olm_machine().await;
1728        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1729            let (new_gen, generation_number) = olm_machine
1730                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1731                .await?;
1732            // If the crypto store generation has changed,
1733            if new_gen {
1734                // (get rid of the reference to the current crypto store first)
1735                drop(olm_machine_guard);
1736                // Recreate the OlmMachine.
1737                self.client.base_client().regenerate_olm(None).await?;
1738            }
1739            Ok(generation_number)
1740        } else {
1741            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1742            // been initialised by the time we get here. Ideally we'd panic, or return an
1743            // error, but for now I'm just adding some logging to check if it
1744            // happens, and returning the magic number 0.
1745            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1746            Ok(0)
1747        }
1748    }
1749
1750    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1751    /// spin-waits until the lock is available.
1752    ///
1753    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1754    /// first time.
1755    pub async fn spin_lock_store(
1756        &self,
1757        max_backoff: Option<u32>,
1758    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1759        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1760            let guard = lock
1761                .spin_lock(max_backoff)
1762                .await
1763                .map_err(|err| {
1764                    Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(
1765                        Box::new(err),
1766                    )))
1767                })?
1768                .map_err(|err| Error::CrossProcessLockError(Box::new(err.into())))?;
1769
1770            let generation = self.on_lock_newly_acquired().await?;
1771
1772            Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1773                _guard: guard.into_guard(),
1774                generation,
1775            }))
1776        } else {
1777            Ok(None)
1778        }
1779    }
1780
1781    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1782    /// attempts to lock it once.
1783    ///
1784    /// Returns a guard to the lock, if it was obtained.
1785    pub async fn try_lock_store_once(
1786        &self,
1787    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1788        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1789            let lock_result = lock.try_lock_once().await?;
1790
1791            let Some(guard) = lock_result.ok() else {
1792                return Ok(None);
1793            };
1794
1795            let generation = self.on_lock_newly_acquired().await?;
1796
1797            Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1798                _guard: guard.into_guard(),
1799                generation,
1800            }))
1801        } else {
1802            Ok(None)
1803        }
1804    }
1805
1806    /// Testing purposes only.
1807    #[cfg(any(test, feature = "testing"))]
1808    pub async fn uploaded_key_count(&self) -> Result<u64> {
1809        let olm_machine = self.client.olm_machine().await;
1810        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1811        Ok(olm_machine.uploaded_key_count().await?)
1812    }
1813
1814    /// Bootstrap encryption and enables event listeners for the E2EE support.
1815    ///
1816    /// Based on the `EncryptionSettings`, this call might:
1817    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1818    /// - Create a key backup if needed (POST `/room_keys/version`)
1819    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1820    ///
1821    /// As part of this process, and if needed, the current device keys would be
1822    /// uploaded to the server, new account data would be added, and cross
1823    /// signing keys and signatures might be uploaded.
1824    ///
1825    /// Should be called once we
1826    /// created a [`OlmMachine`], i.e. after logging in.
1827    ///
1828    /// # Arguments
1829    ///
1830    /// * `auth_data` - Some requests may require re-authentication. To prevent
1831    ///   the user from having to re-enter their password (or use other
1832    ///   methods), we can provide the authentication data here. This is
1833    ///   necessary for uploading cross-signing keys. However, please note that
1834    ///   there is a proposal (MSC3967) to remove this requirement, which would
1835    ///   allow for the initial upload of cross-signing keys without
1836    ///   authentication, rendering this parameter obsolete.
1837    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1838        // It's fine to be async here as we're only getting the lock protecting the
1839        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1840        // in we won't delay the login or restoration of the Client.
1841        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1842            Some(BundleReceiverTask::new(&self.client).await)
1843        } else {
1844            None
1845        };
1846
1847        let mut tasks = self.client.inner.e2ee.tasks.lock();
1848
1849        let this = self.clone();
1850
1851        tasks.setup_e2ee = Some(spawn(async move {
1852            // Update the current state first, so we don't have to wait for the result of
1853            // network requests
1854            this.update_verification_state().await;
1855
1856            if this.settings().auto_enable_cross_signing
1857                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1858            {
1859                error!("Couldn't bootstrap cross signing {e:?}");
1860            }
1861
1862            if let Err(e) = this.backups().setup_and_resume().await {
1863                error!("Couldn't setup and resume backups {e:?}");
1864            }
1865            if let Err(e) = this.recovery().setup().await {
1866                error!("Couldn't setup and resume recovery {e:?}");
1867            }
1868        }));
1869
1870        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1871    }
1872
1873    /// Waits for end-to-end encryption initialization tasks to finish, if any
1874    /// was running in the background.
1875    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1876        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1877
1878        if let Some(task) = task
1879            && let Err(err) = task.await
1880        {
1881            warn!("Error when initializing backups: {err}");
1882        }
1883    }
1884
1885    /// Upload the device keys and initial set of one-time keys to the server.
1886    ///
1887    /// This should only be called when the user logs in for the first time,
1888    /// the method will ensure that other devices see our own device as an
1889    /// end-to-end encryption enabled one.
1890    ///
1891    /// **Warning**: Do not use this method if we're already calling
1892    /// [`Client::send_outgoing_request()`]. This method is intended for
1893    /// explicitly uploading the device keys before starting a sync.
1894    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1895        let olm = self.client.olm_machine().await;
1896        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1897
1898        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1899            self.client.keys_upload(&request_id, &request).await?;
1900
1901            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1902            self.client.keys_query(&request_id, request.device_keys).await?;
1903        }
1904
1905        Ok(())
1906    }
1907
1908    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1909        self.recovery().update_state_after_keys_query(response).await;
1910
1911        // Only update the verification_state if our own devices changed
1912        if let Some(user_id) = self.client.user_id() {
1913            let contains_own_device = response.device_keys.contains_key(user_id);
1914
1915            if contains_own_device {
1916                self.update_verification_state().await;
1917            }
1918        }
1919    }
1920
1921    async fn update_verification_state(&self) {
1922        match self.get_own_device().await {
1923            Ok(device) => {
1924                if let Some(device) = device {
1925                    let is_verified = device.is_cross_signed_by_owner();
1926
1927                    if is_verified {
1928                        self.client.inner.verification_state.set(VerificationState::Verified);
1929                    } else {
1930                        self.client.inner.verification_state.set(VerificationState::Unverified);
1931                    }
1932                } else {
1933                    warn!("Couldn't find out own device in the store.");
1934                    self.client.inner.verification_state.set(VerificationState::Unknown);
1935                }
1936            }
1937            Err(error) => {
1938                warn!("Failed retrieving own device: {error}");
1939                self.client.inner.verification_state.set(VerificationState::Unknown);
1940            }
1941        }
1942    }
1943
1944    /// Encrypts then send the given content via the `/sendToDevice` end-point
1945    /// using Olm encryption.
1946    ///
1947    /// If there are a lot of recipient devices multiple `/sendToDevice`
1948    /// requests might be sent out.
1949    ///
1950    /// # Returns
1951    /// A list of failures. The list of devices that couldn't get the messages.
1952    #[cfg(feature = "experimental-send-custom-to-device")]
1953    pub async fn encrypt_and_send_raw_to_device(
1954        &self,
1955        recipient_devices: Vec<&Device>,
1956        event_type: &str,
1957        content: Raw<AnyToDeviceEventContent>,
1958        share_strategy: CollectStrategy,
1959    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1960        let users = recipient_devices.iter().map(|device| device.user_id());
1961
1962        // Will claim one-time-key for users that needs it
1963        // TODO: For later optimisation: This will establish missing olm sessions with
1964        // all this users devices, but we just want for some devices.
1965        self.client.claim_one_time_keys(users).await?;
1966
1967        let olm = self.client.olm_machine().await;
1968        let olm = olm.as_ref().expect("Olm machine wasn't started");
1969
1970        let (requests, withhelds) = olm
1971            .encrypt_content_for_devices(
1972                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1973                event_type,
1974                &content
1975                    .deserialize_as::<serde_json::Value>()
1976                    .expect("Deserialize as Value will always work"),
1977                share_strategy,
1978            )
1979            .await?;
1980
1981        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1982
1983        // Push the withhelds in the failures
1984        withhelds.iter().for_each(|(d, _)| {
1985            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1986        });
1987
1988        // TODO: parallelize that? it's already grouping 250 devices per chunk.
1989        for request in requests {
1990            let ruma_request = RumaToDeviceRequest::new_raw(
1991                request.event_type.clone(),
1992                request.txn_id.clone(),
1993                request.messages.clone(),
1994            );
1995
1996            let send_result = self
1997                .client
1998                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1999                .await;
2000
2001            // If the sending failed we need to collect the failures to report them
2002            if send_result.is_err() {
2003                // Mark the sending as failed
2004                for (user_id, device_map) in request.messages {
2005                    for device_id in device_map.keys() {
2006                        match device_id {
2007                            DeviceIdOrAllDevices::DeviceId(device_id) => {
2008                                failures.push((user_id.clone(), device_id.to_owned()));
2009                            }
2010                            DeviceIdOrAllDevices::AllDevices => {
2011                                // Cannot happen in this case
2012                            }
2013                        }
2014                    }
2015                }
2016            }
2017        }
2018
2019        Ok(failures)
2020    }
2021}
2022
2023#[cfg(all(test, not(target_family = "wasm")))]
2024mod tests {
2025    use std::{
2026        ops::Not,
2027        str::FromStr,
2028        sync::{
2029            Arc,
2030            atomic::{AtomicBool, Ordering},
2031        },
2032        time::Duration,
2033    };
2034
2035    use matrix_sdk_test::{
2036        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
2037        test_json,
2038    };
2039    use ruma::{
2040        event_id,
2041        events::{reaction::ReactionEventContent, relation::Annotation},
2042    };
2043    use serde_json::json;
2044    use wiremock::{
2045        Mock, MockServer, Request, ResponseTemplate,
2046        matchers::{header, method, path_regex},
2047    };
2048
2049    use crate::{
2050        Client, assert_next_matches_with_timeout,
2051        config::RequestConfig,
2052        encryption::{
2053            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2054        },
2055        test_utils::{
2056            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2057        },
2058    };
2059
2060    #[async_test]
2061    async fn test_reaction_sending() {
2062        let server = MockServer::start().await;
2063        let client = logged_in_client(Some(server.uri())).await;
2064
2065        let event_id = event_id!("$2:example.org");
2066
2067        Mock::given(method("GET"))
2068            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2069            .and(header("authorization", "Bearer 1234"))
2070            .respond_with(
2071                ResponseTemplate::new(200)
2072                    .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
2073            )
2074            .mount(&server)
2075            .await;
2076
2077        Mock::given(method("PUT"))
2078            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2079            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2080                "event_id": event_id,
2081            })))
2082            .mount(&server)
2083            .await;
2084
2085        let response = SyncResponseBuilder::default()
2086            .add_joined_room(
2087                JoinedRoomBuilder::default()
2088                    .add_state_event(StateTestEvent::Member)
2089                    .add_state_event(StateTestEvent::PowerLevels)
2090                    .add_state_event(StateTestEvent::Encryption),
2091            )
2092            .build_sync_response();
2093
2094        client.base_client().receive_sync_response(response).await.unwrap();
2095
2096        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2097        assert!(
2098            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2099        );
2100
2101        let event_id = event_id!("$1:example.org");
2102        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2103        room.send(reaction).await.expect("Sending the reaction should not fail");
2104
2105        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2106    }
2107
2108    #[cfg(feature = "sqlite")]
2109    #[async_test]
2110    async fn test_generation_counter_invalidates_olm_machine() {
2111        // Create two clients using the same sqlite database.
2112
2113        use matrix_sdk_base::store::RoomLoadSettings;
2114        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2115        let session = mock_matrix_session();
2116
2117        let client1 = Client::builder()
2118            .homeserver_url("http://localhost:1234")
2119            .request_config(RequestConfig::new().disable_retry())
2120            .sqlite_store(&sqlite_path, None)
2121            .build()
2122            .await
2123            .unwrap();
2124        client1
2125            .matrix_auth()
2126            .restore_session(session.clone(), RoomLoadSettings::default())
2127            .await
2128            .unwrap();
2129
2130        let client2 = Client::builder()
2131            .homeserver_url("http://localhost:1234")
2132            .request_config(RequestConfig::new().disable_retry())
2133            .sqlite_store(sqlite_path, None)
2134            .build()
2135            .await
2136            .unwrap();
2137        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2138
2139        // When the lock isn't enabled, any attempt at locking won't return a guard.
2140        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2141        assert!(guard.is_none());
2142
2143        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2144        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2145
2146        // One client can take the lock.
2147        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2148        assert!(acquired1.is_some());
2149
2150        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2151        let initial_olm_machine =
2152            client1.olm_machine().await.clone().expect("must have an olm machine");
2153
2154        // Also enable backup to check that new machine has the same backup keys.
2155        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2156            .expect("Can't create new recovery key");
2157        let backup_key = decryption_key.megolm_v1_public_key();
2158        backup_key.set_version("1".to_owned());
2159        initial_olm_machine
2160            .backup_machine()
2161            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2162            .await
2163            .expect("Should save");
2164
2165        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2166
2167        assert!(client1.encryption().backups().are_enabled().await);
2168
2169        // The other client can't take the lock too.
2170        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2171        assert!(acquired2.is_none());
2172
2173        // Now have the first client release the lock,
2174        drop(acquired1);
2175        tokio::time::sleep(Duration::from_millis(100)).await;
2176
2177        // And re-take it.
2178        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2179        assert!(acquired1.is_some());
2180
2181        // In that case, the Olm Machine shouldn't change.
2182        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2183        assert!(initial_olm_machine.same_as(&olm_machine));
2184
2185        // Ok, release again.
2186        drop(acquired1);
2187        tokio::time::sleep(Duration::from_millis(100)).await;
2188
2189        // Client2 can acquire the lock.
2190        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2191        assert!(acquired2.is_some());
2192
2193        // And then release it.
2194        drop(acquired2);
2195        tokio::time::sleep(Duration::from_millis(100)).await;
2196
2197        // Client1 can acquire it again,
2198        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2199        assert!(acquired1.is_some());
2200
2201        // But now its olm machine has been invalidated and thus regenerated!
2202        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2203
2204        assert!(!initial_olm_machine.same_as(&olm_machine));
2205
2206        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2207        assert!(backup_key_new.decryption_key.is_some());
2208        assert_eq!(
2209            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2210            backup_key.to_base64()
2211        );
2212        assert!(client1.encryption().backups().are_enabled().await);
2213    }
2214
2215    #[cfg(feature = "sqlite")]
2216    #[async_test]
2217    async fn test_generation_counter_no_spurious_invalidation() {
2218        // Create two clients using the same sqlite database.
2219
2220        use matrix_sdk_base::store::RoomLoadSettings;
2221        let sqlite_path =
2222            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2223        let session = mock_matrix_session();
2224
2225        let client = Client::builder()
2226            .homeserver_url("http://localhost:1234")
2227            .request_config(RequestConfig::new().disable_retry())
2228            .sqlite_store(&sqlite_path, None)
2229            .build()
2230            .await
2231            .unwrap();
2232        client
2233            .matrix_auth()
2234            .restore_session(session.clone(), RoomLoadSettings::default())
2235            .await
2236            .unwrap();
2237
2238        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2239
2240        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2241
2242        // Enabling the lock doesn't update the olm machine.
2243        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2244        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2245
2246        {
2247            // Simulate that another client hold the lock before.
2248            let client2 = Client::builder()
2249                .homeserver_url("http://localhost:1234")
2250                .request_config(RequestConfig::new().disable_retry())
2251                .sqlite_store(sqlite_path, None)
2252                .build()
2253                .await
2254                .unwrap();
2255            client2
2256                .matrix_auth()
2257                .restore_session(session, RoomLoadSettings::default())
2258                .await
2259                .unwrap();
2260
2261            client2
2262                .encryption()
2263                .enable_cross_process_store_lock("client2".to_owned())
2264                .await
2265                .unwrap();
2266
2267            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2268            assert!(guard.is_some());
2269
2270            drop(guard);
2271            tokio::time::sleep(Duration::from_millis(100)).await;
2272        }
2273
2274        {
2275            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2276            assert!(acquired.is_some());
2277        }
2278
2279        // Taking the lock the first time will update the olm machine.
2280        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2281        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2282
2283        {
2284            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2285            assert!(acquired.is_some());
2286        }
2287
2288        // Re-taking the lock doesn't update the olm machine.
2289        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2290        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2291    }
2292
2293    #[async_test]
2294    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2295        // Given a client and a server
2296        let client = no_retry_test_client(None).await;
2297        let server = MockServer::start().await;
2298
2299        // When we subscribe to its verification state
2300        let mut verification_state = client.encryption().verification_state();
2301
2302        // We can get its initial value, and it's Unknown
2303        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2304
2305        // We set up a mocked request to check this endpoint is not called before
2306        // reading the new state
2307        let keys_requested = Arc::new(AtomicBool::new(false));
2308        let inner_bool = keys_requested.clone();
2309
2310        Mock::given(method("GET"))
2311            .and(path_regex(
2312                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2313            ))
2314            .respond_with(move |_req: &Request| {
2315                inner_bool.fetch_or(true, Ordering::SeqCst);
2316                ResponseTemplate::new(200).set_body_json(json!({}))
2317            })
2318            .mount(&server)
2319            .await;
2320
2321        // When the session is initialised and the encryption tasks spawn
2322        set_client_session(&client).await;
2323
2324        // Then we can get an updated value without waiting for any network requests
2325        assert!(keys_requested.load(Ordering::SeqCst).not());
2326        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2327    }
2328
2329    #[test]
2330    fn test_oauth_reset_info_from_uiaa_info() {
2331        let auth_info = json!({
2332            "session": "dummy",
2333            "flows": [
2334                {
2335                    "stages": [
2336                        "org.matrix.cross_signing_reset"
2337                    ]
2338                }
2339            ],
2340            "params": {
2341                "org.matrix.cross_signing_reset": {
2342                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2343                }
2344            },
2345            "msg": "To reset..."
2346        });
2347
2348        let auth_info = serde_json::from_value(auth_info)
2349            .expect("We should be able to deserialize the UiaaInfo");
2350        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2351            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2352    }
2353
2354    #[test]
2355    fn test_duplicate_one_time_key_error_parsing() {
2356        let message = concat!(
2357            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2358            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2359            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2360            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2361            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2362            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2363            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2364        );
2365        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2366            .expect("We should be able to parse the error message");
2367
2368        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2369        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2370
2371        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2372            .expect_err("We shouldn't be able to parse an incomplete error message");
2373    }
2374
2375    // Helper function for the test_devices_to_verify_against_* tests.  Make a
2376    // response to a /keys/query request using the given device keys and a
2377    // pre-defined set of cross-signing keys.
2378    fn devices_to_verify_against_keys_query_response(
2379        devices: Vec<serde_json::Value>,
2380    ) -> serde_json::Value {
2381        let device_keys: serde_json::Map<String, serde_json::Value> = devices
2382            .into_iter()
2383            .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2384            .collect();
2385        json!({
2386            "device_keys": {
2387                "@example:localhost": device_keys,
2388            },
2389            "master_keys": {
2390                "@example:localhost": {
2391                    "keys": {
2392                        "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2393                    },
2394                    "usage": ["master"],
2395                    "user_id": "@example:localhost",
2396                },
2397            },
2398            "self_signing_keys": {
2399                "@example:localhost": {
2400                    "keys": {
2401                        "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2402                    },
2403                    "usage": ["self_signing"],
2404                    "user_id": "@example:localhost",
2405                    "signatures": {
2406                        "@example:localhost": {
2407                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2408                        },
2409                    },
2410                },
2411            },
2412            "user_signing_keys": {
2413                "@example:localhost": {
2414                    "keys": {
2415                        "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2416                    },
2417                    "usage": ["user_signing"],
2418                    "user_id": "@example:localhost",
2419                    "signatures": {
2420                        "@example:localhost": {
2421                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2422                        },
2423                    },
2424                },
2425            }
2426        })
2427    }
2428
2429    // The following three tests test that we can detect whether the user has
2430    // other devices that they can verify against under different conditions.
2431    #[async_test]
2432    /// Test that we detect that can't verify against another device if we have
2433    /// no devices.
2434    async fn test_devices_to_verify_against_no_devices() {
2435        let server = MockServer::start().await;
2436        let client = logged_in_client(Some(server.uri())).await;
2437
2438        Mock::given(method("POST"))
2439            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2440            .respond_with(
2441                ResponseTemplate::new(200)
2442                    .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2443            )
2444            .mount(&server)
2445            .await;
2446
2447        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2448    }
2449
2450    #[async_test]
2451    /// Test that we detect that we can verify against another cross-signed
2452    /// regular device.
2453    async fn test_devices_to_verify_against_cross_signed() {
2454        let server = MockServer::start().await;
2455        let client = logged_in_client(Some(server.uri())).await;
2456
2457        Mock::given(method("POST"))
2458            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2459            .respond_with(ResponseTemplate::new(200).set_body_json(
2460                devices_to_verify_against_keys_query_response(vec![
2461                    json!({
2462                        "algorithms": [
2463                            "m.olm.v1.curve25519-aes-sha2",
2464                            "m.megolm.v1.aes-sha2",
2465                        ],
2466                        "user_id": "@example:localhost",
2467                        "device_id": "SIGNEDDEVICE",
2468                        "keys": {
2469                            "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2470                            "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2471                        },
2472                        "signatures": {
2473                            "@example:localhost": {
2474                                "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2475                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2476                            },
2477                        },
2478                    })
2479                ])
2480            ))
2481            .mount(&server)
2482            .await;
2483
2484        assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2485    }
2486
2487    #[async_test]
2488    /// Test that we detect that we can't verify against a dehydrated or
2489    /// unsigned device.
2490    async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2491        let server = MockServer::start().await;
2492        let client = logged_in_client(Some(server.uri())).await;
2493        let user_id = client.user_id().unwrap();
2494        let olm_machine = client.olm_machine().await;
2495        let olm_machine = olm_machine.as_ref().unwrap();
2496
2497        Mock::given(method("POST"))
2498            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2499            .respond_with(ResponseTemplate::new(200).set_body_json(
2500                devices_to_verify_against_keys_query_response(vec![
2501                    json!({
2502                        "algorithms": [
2503                            "m.olm.v1.curve25519-aes-sha2",
2504                            "m.megolm.v1.aes-sha2",
2505                        ],
2506                        "user_id": "@example:localhost",
2507                        "device_id": "DEHYDRATEDDEVICE",
2508                        "keys": {
2509                            "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2510                            "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2511                        },
2512                        "dehydrated": true,
2513                        "signatures": {
2514                            "@example:localhost": {
2515                                "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2516                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2517                            },
2518                        },
2519                    }),
2520                    json!({
2521                        "algorithms": [
2522                            "m.olm.v1.curve25519-aes-sha2",
2523                            "m.megolm.v1.aes-sha2",
2524                        ],
2525                        "user_id": "@example:localhost",
2526                        "device_id": "UNSIGNEDDEVICE",
2527                        "keys": {
2528                            "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2529                            "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2530                        },
2531                        "signatures": {
2532                            "@example:localhost": {
2533                                "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2534                            },
2535                        },
2536                    }),
2537                ])
2538            ))
2539            .mount(&server)
2540            .await;
2541
2542        let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2543        client.keys_query(&request_id, request.device_keys).await.unwrap();
2544
2545        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2546    }
2547}