matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::PathBuf,
26    str::FromStr,
27    sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33    future::try_join,
34    stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39    StateStoreDataKey, StateStoreDataValue,
40    cross_process_lock::CrossProcessLockError,
41    crypto::{
42        CrossSigningBootstrapRequests, OlmMachine,
43        store::types::{RoomKeyBundleInfo, RoomKeyInfo},
44        types::{
45            SignedKey,
46            requests::{
47                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
48            },
49        },
50    },
51};
52use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
53use ruma::{
54    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
55    api::client::{
56        error::{ErrorBody, StandardErrorBody},
57        keys::{
58            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
59            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
60        },
61        message::send_message_event,
62        to_device::send_event_to_device::v3::{
63            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
64        },
65        uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
66    },
67    assign,
68    events::room::{MediaSource, ThumbnailInfo},
69};
70#[cfg(feature = "experimental-send-custom-to-device")]
71use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
72use serde::{Deserialize, de::Error as _};
73use tasks::BundleReceiverTask;
74use tokio::sync::{Mutex, RwLockReadGuard};
75use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
76use tracing::{debug, error, instrument, warn};
77use url::Url;
78use vodozemac::Curve25519PublicKey;
79
80use self::{
81    backups::{Backups, types::BackupClientState},
82    futures::UploadEncryptedFile,
83    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
84    recovery::{Recovery, RecoveryState},
85    secret_storage::SecretStorage,
86    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
87    verification::{SasVerification, Verification, VerificationRequest},
88};
89use crate::{
90    Client, Error, HttpError, Result, RumaApiError, TransmissionProgress,
91    attachment::Thumbnail,
92    client::{ClientInner, WeakClient},
93    cross_process_lock::CrossProcessLockGuard,
94    error::HttpResult,
95};
96
97pub mod backups;
98pub mod futures;
99pub mod identities;
100pub mod recovery;
101pub mod secret_storage;
102pub(crate) mod tasks;
103pub mod verification;
104
105pub use matrix_sdk_base::crypto::{
106    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
107    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
108    SessionCreationError, SignatureError, VERSION,
109    olm::{
110        SessionCreationError as MegolmSessionCreationError,
111        SessionExportError as OlmSessionExportError,
112    },
113    vodozemac,
114};
115
116#[cfg(feature = "experimental-send-custom-to-device")]
117use crate::config::RequestConfig;
118pub use crate::error::RoomKeyImportError;
119
120/// All the data related to the encryption state.
121pub(crate) struct EncryptionData {
122    /// Background tasks related to encryption (key backup, initialization
123    /// tasks, etc.).
124    pub tasks: StdMutex<ClientTasks>,
125
126    /// End-to-end encryption settings.
127    pub encryption_settings: EncryptionSettings,
128
129    /// All state related to key backup.
130    pub backup_state: BackupClientState,
131
132    /// All state related to secret storage recovery.
133    pub recovery_state: SharedObservable<RecoveryState>,
134}
135
136impl EncryptionData {
137    pub fn new(encryption_settings: EncryptionSettings) -> Self {
138        Self {
139            encryption_settings,
140
141            tasks: StdMutex::new(Default::default()),
142            backup_state: Default::default(),
143            recovery_state: Default::default(),
144        }
145    }
146
147    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
148        let weak_client = WeakClient::from_inner(client);
149
150        let mut tasks = self.tasks.lock();
151        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
152
153        if self.encryption_settings.backup_download_strategy
154            == BackupDownloadStrategy::AfterDecryptionFailure
155        {
156            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
157        }
158    }
159
160    /// Initialize the background task which listens for changes in the
161    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
162    ///
163    /// This should happen after the usual tasks have been set up and after the
164    /// E2EE initialization tasks have been set up.
165    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
166        let mut guard = self.tasks.lock();
167
168        let future = Recovery::update_state_after_backup_state_change(client);
169        let join_handle = spawn(future);
170
171        guard.update_recovery_state_after_backup = Some(join_handle);
172    }
173}
174
175/// Settings for end-to-end encryption features.
176#[derive(Clone, Copy, Debug, Default)]
177pub struct EncryptionSettings {
178    /// Automatically bootstrap cross-signing for a user once they're logged, in
179    /// case it's not already done yet.
180    ///
181    /// This requires to login with a username and password, or that MSC3967 is
182    /// enabled on the server, as of 2023-10-20.
183    pub auto_enable_cross_signing: bool,
184
185    /// Select a strategy to download room keys from the backup, by default room
186    /// keys won't be downloaded from the backup automatically.
187    ///
188    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
189    pub backup_download_strategy: BackupDownloadStrategy,
190
191    /// Automatically create a backup version if no backup exists.
192    pub auto_enable_backups: bool,
193}
194
195/// Settings for end-to-end encryption features.
196#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
197#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
198pub enum BackupDownloadStrategy {
199    /// Automatically download all room keys from the backup when the backup
200    /// recovery key has been received. The backup recovery key can be received
201    /// in two ways:
202    ///
203    /// 1. Received as a `m.secret.send` to-device event, after a successful
204    ///    interactive verification.
205    /// 2. Imported from secret storage (4S) using the
206    ///    [`SecretStore::import_secrets()`] method.
207    ///
208    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
209    OneShot,
210
211    /// Attempt to download a single room key if an event fails to be decrypted.
212    AfterDecryptionFailure,
213
214    /// Don't download any room keys automatically. The user can manually
215    /// download room keys using the [`Backups::download_room_key()`] methods.
216    ///
217    /// This is the default option.
218    #[default]
219    Manual,
220}
221
222/// The verification state of our own device
223///
224/// This enum tells us if our own user identity trusts these devices, in other
225/// words it tells us if the user identity has signed the device.
226#[derive(Clone, Copy, Debug, Eq, PartialEq)]
227pub enum VerificationState {
228    /// The verification state is unknown for now.
229    Unknown,
230    /// The device is considered to be verified, it has been signed by its user
231    /// identity.
232    Verified,
233    /// The device is unverified.
234    Unverified,
235}
236
237/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
238#[derive(Debug)]
239pub struct CrossProcessLockStoreGuardWithGeneration {
240    _guard: CrossProcessLockGuard,
241    generation: u64,
242}
243
244impl CrossProcessLockStoreGuardWithGeneration {
245    /// Return the Crypto Store generation associated with this store lock.
246    pub fn generation(&self) -> u64 {
247        self.generation
248    }
249}
250
251/// A stateful struct remembering the cross-signing keys we need to upload.
252///
253/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
254/// additional authentication, this struct will contain information on the type
255/// of authentication the user needs to complete before the upload might be
256/// continued.
257///
258/// More info can be found in the [spec].
259///
260/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
261#[derive(Debug)]
262pub struct CrossSigningResetHandle {
263    client: Client,
264    upload_request: UploadSigningKeysRequest,
265    signatures_request: UploadSignaturesRequest,
266    auth_type: CrossSigningResetAuthType,
267    is_cancelled: Mutex<bool>,
268}
269
270impl CrossSigningResetHandle {
271    /// Set up a new `CrossSigningResetHandle`.
272    pub fn new(
273        client: Client,
274        upload_request: UploadSigningKeysRequest,
275        signatures_request: UploadSignaturesRequest,
276        auth_type: CrossSigningResetAuthType,
277    ) -> Self {
278        Self {
279            client,
280            upload_request,
281            signatures_request,
282            auth_type,
283            is_cancelled: Mutex::new(false),
284        }
285    }
286
287    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
288    /// is using.
289    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
290        &self.auth_type
291    }
292
293    /// Continue the cross-signing reset by either waiting for the
294    /// authentication to be done on the side of the OAuth 2.0 server or by
295    /// providing additional [`AuthData`] the homeserver requires.
296    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
297        let mut upload_request = self.upload_request.clone();
298        upload_request.auth = auth;
299
300        while let Err(e) = self.client.send(upload_request.clone()).await {
301            if *self.is_cancelled.lock().await {
302                return Ok(());
303            }
304
305            match e.as_uiaa_response() {
306                Some(uiaa_info) => {
307                    if uiaa_info.auth_error.is_some() {
308                        return Err(e.into());
309                    }
310                }
311                None => return Err(e.into()),
312            }
313        }
314
315        self.client.send(self.signatures_request.clone()).await?;
316
317        Ok(())
318    }
319
320    /// Cancel the ongoing identity reset process
321    pub async fn cancel(&self) {
322        *self.is_cancelled.lock().await = true;
323    }
324}
325
326/// information about the additional authentication that is required before the
327/// cross-signing keys can be uploaded.
328#[derive(Debug, Clone)]
329pub enum CrossSigningResetAuthType {
330    /// The homeserver requires user-interactive authentication.
331    Uiaa(UiaaInfo),
332    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
333    /// approve the upload of cross-signing keys.
334    OAuth(OAuthCrossSigningResetInfo),
335}
336
337impl CrossSigningResetAuthType {
338    fn new(error: &HttpError) -> Result<Option<Self>> {
339        if let Some(auth_info) = error.as_uiaa_response() {
340            if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
341                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
342            } else {
343                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
344            }
345        } else {
346            Ok(None)
347        }
348    }
349}
350
351/// OAuth 2.0 specific information about the required authentication for the
352/// upload of cross-signing keys.
353#[derive(Debug, Clone, Deserialize)]
354pub struct OAuthCrossSigningResetInfo {
355    /// The URL where the user can approve the reset of the cross-signing keys.
356    pub approval_url: Url,
357}
358
359impl OAuthCrossSigningResetInfo {
360    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
361        let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
362            return Ok(None);
363        };
364
365        Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
366    }
367}
368
369/// A struct that helps to parse the custom error message Synapse posts if a
370/// duplicate one-time key is uploaded.
371#[derive(Debug)]
372struct DuplicateOneTimeKeyErrorMessage {
373    /// The previously uploaded one-time key.
374    old_key: Curve25519PublicKey,
375    /// The one-time key we're attempting to upload right now.
376    new_key: Curve25519PublicKey,
377}
378
379impl FromStr for DuplicateOneTimeKeyErrorMessage {
380    type Err = serde_json::Error;
381
382    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
383        // First we split the string into two parts, the part containing the old key and
384        // the part containing the new key. The parts are conveniently separated
385        // by a `;` character.
386        let mut split = s.split_terminator(';');
387
388        let old_key = split
389            .next()
390            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
391        let new_key = split
392            .next()
393            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
394
395        // Now we remove the lengthy prefix from the part containing the old key, we
396        // should be left with just the JSON of the signed key.
397        let old_key_index = old_key
398            .find("Old key:")
399            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
400
401        let old_key = old_key[old_key_index..]
402            .trim()
403            .strip_prefix("Old key:")
404            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
405
406        // The part containing the new key is much simpler, we just remove a static
407        // prefix.
408        let new_key = new_key
409            .trim()
410            .strip_prefix("new key:")
411            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
412
413        // The JSON containing the new key is for some reason quoted using single
414        // quotes, so let's replace them with normal double quotes.
415        let new_key = new_key.replace("'", "\"");
416
417        // Let's deserialize now.
418        let old_key: SignedKey = serde_json::from_str(old_key)?;
419        let new_key: SignedKey = serde_json::from_str(&new_key)?;
420
421        // Pick out the Curve keys, we don't care about the rest that much.
422        let old_key = old_key.key();
423        let new_key = new_key.key();
424
425        Ok(Self { old_key, new_key })
426    }
427}
428
429impl Client {
430    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
431        self.base_client().olm_machine().await
432    }
433
434    pub(crate) async fn mark_request_as_sent(
435        &self,
436        request_id: &TransactionId,
437        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
438    ) -> Result<(), matrix_sdk_base::Error> {
439        Ok(self
440            .olm_machine()
441            .await
442            .as_ref()
443            .expect(
444                "We should have an olm machine once we try to mark E2EE related requests as sent",
445            )
446            .mark_request_as_sent(request_id, response)
447            .await?)
448    }
449
450    /// Query the server for users device keys.
451    ///
452    /// # Panics
453    ///
454    /// Panics if no key query needs to be done.
455    #[instrument(skip(self, device_keys))]
456    pub(crate) async fn keys_query(
457        &self,
458        request_id: &TransactionId,
459        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
460    ) -> Result<get_keys::v3::Response> {
461        let request = assign!(get_keys::v3::Request::new(), { device_keys });
462
463        let response = self.send(request).await?;
464        self.mark_request_as_sent(request_id, &response).await?;
465        self.encryption().update_state_after_keys_query(&response).await;
466
467        Ok(response)
468    }
469
470    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
471    /// encrypting and uploading a provided reader.
472    ///
473    /// # Arguments
474    ///
475    /// * `content_type` - The content type of the file.
476    /// * `reader` - The reader that should be encrypted and uploaded.
477    ///
478    /// # Examples
479    ///
480    /// ```no_run
481    /// # use matrix_sdk::Client;
482    /// # use url::Url;
483    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
484    /// use serde::{Deserialize, Serialize};
485    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
486    ///
487    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
488    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
489    /// struct CustomEventContent {
490    ///     encrypted_file: EncryptedFile,
491    /// }
492    ///
493    /// # async {
494    /// # let homeserver = Url::parse("http://example.com")?;
495    /// # let client = Client::new(homeserver).await?;
496    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
497    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
498    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
499    ///
500    /// room.send(CustomEventContent { encrypted_file }).await?;
501    /// # anyhow::Ok(()) };
502    /// ```
503    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
504        &'a self,
505        reader: &'a mut R,
506    ) -> UploadEncryptedFile<'a, R> {
507        UploadEncryptedFile::new(self, reader)
508    }
509
510    /// Encrypt and upload the file and thumbnails, and return the source
511    /// information.
512    pub(crate) async fn upload_encrypted_media_and_thumbnail(
513        &self,
514        data: &[u8],
515        thumbnail: Option<Thumbnail>,
516        send_progress: SharedObservable<TransmissionProgress>,
517    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
518        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
519
520        let upload_attachment = async {
521            let mut cursor = Cursor::new(data);
522            self.upload_encrypted_file(&mut cursor)
523                .with_send_progress_observable(send_progress)
524                .await
525        };
526
527        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
528
529        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
530    }
531
532    /// Uploads an encrypted thumbnail to the media repository, and returns
533    /// its source and extra information.
534    async fn upload_encrypted_thumbnail(
535        &self,
536        thumbnail: Option<Thumbnail>,
537        send_progress: SharedObservable<TransmissionProgress>,
538    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
539        let Some(thumbnail) = thumbnail else {
540            return Ok(None);
541        };
542
543        let (data, _, thumbnail_info) = thumbnail.into_parts();
544        let mut cursor = Cursor::new(data);
545
546        let file = self
547            .upload_encrypted_file(&mut cursor)
548            .with_send_progress_observable(send_progress)
549            .await?;
550
551        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
552    }
553
554    /// Claim one-time keys creating new Olm sessions.
555    ///
556    /// # Arguments
557    ///
558    /// * `users` - The list of user/device pairs that we should claim keys for.
559    pub(crate) async fn claim_one_time_keys(
560        &self,
561        users: impl Iterator<Item = &UserId>,
562    ) -> Result<()> {
563        let _lock = self.locks().key_claim_lock.lock().await;
564
565        if let Some((request_id, request)) = self
566            .olm_machine()
567            .await
568            .as_ref()
569            .ok_or(Error::NoOlmMachine)?
570            .get_missing_sessions(users)
571            .await?
572        {
573            let response = self.send(request).await?;
574            self.mark_request_as_sent(&request_id, &response).await?;
575        }
576
577        Ok(())
578    }
579
580    /// Upload the E2E encryption keys.
581    ///
582    /// This uploads the long lived device keys as well as the required amount
583    /// of one-time keys.
584    ///
585    /// # Panics
586    ///
587    /// Panics if the client isn't logged in, or if no encryption keys need to
588    /// be uploaded.
589    #[instrument(skip(self, request))]
590    pub(crate) async fn keys_upload(
591        &self,
592        request_id: &TransactionId,
593        request: &upload_keys::v3::Request,
594    ) -> Result<upload_keys::v3::Response> {
595        debug!(
596            device_keys = request.device_keys.is_some(),
597            one_time_key_count = request.one_time_keys.len(),
598            "Uploading public encryption keys",
599        );
600
601        let response = self.send(request.clone()).await?;
602        self.mark_request_as_sent(request_id, &response).await?;
603
604        Ok(response)
605    }
606
607    pub(crate) async fn room_send_helper(
608        &self,
609        request: &RoomMessageRequest,
610    ) -> Result<send_message_event::v3::Response> {
611        let content = request.content.clone();
612        let txn_id = request.txn_id.clone();
613        let room_id = &request.room_id;
614
615        self.get_room(room_id)
616            .expect("Can't send a message to a room that isn't known to the store")
617            .send(*content)
618            .with_transaction_id(txn_id)
619            .await
620            .map(|result| result.response)
621    }
622
623    pub(crate) async fn send_to_device(
624        &self,
625        request: &ToDeviceRequest,
626    ) -> HttpResult<ToDeviceResponse> {
627        let request = RumaToDeviceRequest::new_raw(
628            request.event_type.clone(),
629            request.txn_id.clone(),
630            request.messages.clone(),
631        );
632
633        self.send(request).await
634    }
635
636    pub(crate) async fn send_verification_request(
637        &self,
638        request: OutgoingVerificationRequest,
639    ) -> Result<()> {
640        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
641
642        match request {
643            ToDevice(t) => {
644                self.send_to_device(&t).await?;
645            }
646            InRoom(r) => {
647                self.room_send_helper(&r).await?;
648            }
649        }
650
651        Ok(())
652    }
653
654    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
655        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
656
657        match r.request() {
658            AnyOutgoingRequest::KeysQuery(request) => {
659                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
660            }
661            AnyOutgoingRequest::KeysUpload(request) => {
662                let response = self.keys_upload(r.request_id(), request).await;
663
664                if let Err(e) = &response {
665                    match e.as_ruma_api_error() {
666                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
667                            if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
668                            {
669                                // This is one of the nastiest errors we can have. The server
670                                // telling us that we already have a one-time key uploaded means
671                                // that we forgot about some of our one-time keys. This will lead to
672                                // UTDs.
673                                {
674                                    let already_reported = self
675                                        .state_store()
676                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
677                                        .await?
678                                        .is_some();
679
680                                    if message.starts_with("One time key") && !already_reported {
681                                        if let Ok(message) =
682                                            DuplicateOneTimeKeyErrorMessage::from_str(message)
683                                        {
684                                            error!(
685                                                sentry = true,
686                                                old_key = %message.old_key,
687                                                new_key = %message.new_key,
688                                                "Duplicate one-time keys have been uploaded"
689                                            );
690                                        } else {
691                                            error!(
692                                                sentry = true,
693                                                "Duplicate one-time keys have been uploaded"
694                                            );
695                                        }
696
697                                        self.state_store()
698                                            .set_kv_data(
699                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
700                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
701                                            )
702                                            .await?;
703                                    }
704                                }
705                            }
706                        }
707                        _ => {}
708                    }
709
710                    response?;
711                }
712            }
713            AnyOutgoingRequest::ToDeviceRequest(request) => {
714                let response = self.send_to_device(request).await?;
715                self.mark_request_as_sent(r.request_id(), &response).await?;
716            }
717            AnyOutgoingRequest::SignatureUpload(request) => {
718                let response = self.send(request.clone()).await?;
719                self.mark_request_as_sent(r.request_id(), &response).await?;
720            }
721            AnyOutgoingRequest::RoomMessage(request) => {
722                let response = self.room_send_helper(request).await?;
723                self.mark_request_as_sent(r.request_id(), &response).await?;
724            }
725            AnyOutgoingRequest::KeysClaim(request) => {
726                let response = self.send(request.clone()).await?;
727                self.mark_request_as_sent(r.request_id(), &response).await?;
728            }
729        }
730
731        Ok(())
732    }
733
734    #[instrument(skip_all)]
735    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
736        const MAX_CONCURRENT_REQUESTS: usize = 20;
737
738        // This is needed because sometimes we need to automatically
739        // claim some one-time keys to unwedge an existing Olm session.
740        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
741            warn!("Error while claiming one-time keys {:?}", e);
742        }
743
744        let outgoing_requests = stream::iter(
745            self.olm_machine()
746                .await
747                .as_ref()
748                .ok_or(Error::NoOlmMachine)?
749                .outgoing_requests()
750                .await?,
751        )
752        .map(|r| self.send_outgoing_request(r));
753
754        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
755
756        requests
757            .for_each(|r| async move {
758                match r {
759                    Ok(_) => (),
760                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
761                }
762            })
763            .await;
764
765        Ok(())
766    }
767}
768
769#[cfg(any(feature = "testing", test))]
770impl Client {
771    /// Get the olm machine, for testing purposes only.
772    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
773        self.olm_machine().await
774    }
775}
776
777/// A high-level API to manage the client's encryption.
778///
779/// To get this, use [`Client::encryption()`].
780#[derive(Debug, Clone)]
781pub struct Encryption {
782    /// The underlying client.
783    client: Client,
784}
785
786impl Encryption {
787    pub(crate) fn new(client: Client) -> Self {
788        Self { client }
789    }
790
791    /// Returns the current encryption settings for this client.
792    pub(crate) fn settings(&self) -> EncryptionSettings {
793        self.client.inner.e2ee.encryption_settings
794    }
795
796    /// Get the public ed25519 key of our own device. This is usually what is
797    /// called the fingerprint of the device.
798    pub async fn ed25519_key(&self) -> Option<String> {
799        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
800    }
801
802    /// Get the public Curve25519 key of our own device.
803    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
804        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
805    }
806
807    /// Get the current device creation timestamp.
808    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
809        match self.get_own_device().await {
810            Ok(Some(device)) => device.first_time_seen_ts(),
811            // Should not happen, there should always be an own device
812            _ => MilliSecondsSinceUnixEpoch::now(),
813        }
814    }
815
816    pub(crate) async fn import_secrets_bundle(
817        &self,
818        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
819    ) -> Result<(), SecretImportError> {
820        let olm_machine = self.client.olm_machine().await;
821        let olm_machine =
822            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
823
824        olm_machine.store().import_secrets_bundle(bundle).await
825    }
826
827    /// Get the status of the private cross signing keys.
828    ///
829    /// This can be used to check which private cross signing keys we have
830    /// stored locally.
831    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
832        let olm = self.client.olm_machine().await;
833        let machine = olm.as_ref()?;
834        Some(machine.cross_signing_status().await)
835    }
836
837    /// Does the user have other devices that the current device can verify
838    /// against?
839    ///
840    /// The device must be signed by the user's cross-signing key, must have an
841    /// identity, and must not be a dehydrated device.
842    pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
843        let olm_machine = self.client.olm_machine().await;
844        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
845        let user_id = olm_machine.user_id();
846
847        self.ensure_initial_key_query().await?;
848
849        let devices = self.get_user_devices(user_id).await?;
850
851        let ret = devices.devices().any(|device| {
852            device.is_cross_signed_by_owner()
853                && device.curve25519_key().is_some()
854                && !device.is_dehydrated()
855        });
856
857        Ok(ret)
858    }
859
860    /// Get all the tracked users we know about
861    ///
862    /// Tracked users are users for which we keep the device list of E2EE
863    /// capable devices up to date.
864    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
865        if let Some(machine) = self.client.olm_machine().await.as_ref() {
866            machine.tracked_users().await
867        } else {
868            Ok(HashSet::new())
869        }
870    }
871
872    /// Get a [`Subscriber`] for the [`VerificationState`].
873    ///
874    /// # Examples
875    ///
876    /// ```no_run
877    /// use matrix_sdk::{Client, encryption};
878    /// use url::Url;
879    ///
880    /// # async {
881    /// let homeserver = Url::parse("http://example.com")?;
882    /// let client = Client::new(homeserver).await?;
883    /// let mut subscriber = client.encryption().verification_state();
884    ///
885    /// let current_value = subscriber.get();
886    ///
887    /// println!("The current verification state is: {current_value:?}");
888    ///
889    /// if let Some(verification_state) = subscriber.next().await {
890    ///     println!("Received verification state update {:?}", verification_state)
891    /// }
892    /// # anyhow::Ok(()) };
893    /// ```
894    pub fn verification_state(&self) -> Subscriber<VerificationState> {
895        self.client.inner.verification_state.subscribe_reset()
896    }
897
898    /// Get a verification object with the given flow id.
899    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
900        let olm = self.client.olm_machine().await;
901        let olm = olm.as_ref()?;
902        #[allow(clippy::bind_instead_of_map)]
903        olm.get_verification(user_id, flow_id).and_then(|v| match v {
904            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
905                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
906            }
907            #[cfg(feature = "qrcode")]
908            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
909                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
910            }
911            _ => None,
912        })
913    }
914
915    /// Get a `VerificationRequest` object for the given user with the given
916    /// flow id.
917    pub async fn get_verification_request(
918        &self,
919        user_id: &UserId,
920        flow_id: impl AsRef<str>,
921    ) -> Option<VerificationRequest> {
922        let olm = self.client.olm_machine().await;
923        let olm = olm.as_ref()?;
924
925        olm.get_verification_request(user_id, flow_id)
926            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
927    }
928
929    /// Get a specific device of a user.
930    ///
931    /// # Arguments
932    ///
933    /// * `user_id` - The unique id of the user that the device belongs to.
934    ///
935    /// * `device_id` - The unique id of the device.
936    ///
937    /// Returns a `Device` if one is found and the crypto store didn't throw an
938    /// error.
939    ///
940    /// This will always return None if the client hasn't been logged in.
941    ///
942    /// # Examples
943    ///
944    /// ```no_run
945    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
946    /// # use url::Url;
947    /// # async {
948    /// # let alice = user_id!("@alice:example.org");
949    /// # let homeserver = Url::parse("http://example.com")?;
950    /// # let client = Client::new(homeserver).await?;
951    /// if let Some(device) =
952    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
953    /// {
954    ///     println!("{:?}", device.is_verified());
955    ///
956    ///     if !device.is_verified() {
957    ///         let verification = device.request_verification().await?;
958    ///     }
959    /// }
960    /// # anyhow::Ok(()) };
961    /// ```
962    pub async fn get_device(
963        &self,
964        user_id: &UserId,
965        device_id: &DeviceId,
966    ) -> Result<Option<Device>, CryptoStoreError> {
967        let olm = self.client.olm_machine().await;
968        let Some(machine) = olm.as_ref() else { return Ok(None) };
969        let device = machine.get_device(user_id, device_id, None).await?;
970        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
971    }
972
973    /// A convenience method to retrieve your own device from the store.
974    ///
975    /// This is the same as calling [`Encryption::get_device()`] with your own
976    /// user and device ID.
977    ///
978    /// This will always return a device, unless you are not logged in.
979    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
980        let olm = self.client.olm_machine().await;
981        let Some(machine) = olm.as_ref() else { return Ok(None) };
982        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
983        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
984    }
985
986    /// Get a map holding all the devices of an user.
987    ///
988    /// This will always return an empty map if the client hasn't been logged
989    /// in.
990    ///
991    /// # Arguments
992    ///
993    /// * `user_id` - The unique id of the user that the devices belong to.
994    ///
995    /// # Examples
996    ///
997    /// ```no_run
998    /// # use matrix_sdk::{Client, ruma::user_id};
999    /// # use url::Url;
1000    /// # async {
1001    /// # let alice = user_id!("@alice:example.org");
1002    /// # let homeserver = Url::parse("http://example.com")?;
1003    /// # let client = Client::new(homeserver).await?;
1004    /// let devices = client.encryption().get_user_devices(alice).await?;
1005    ///
1006    /// for device in devices.devices() {
1007    ///     println!("{device:?}");
1008    /// }
1009    /// # anyhow::Ok(()) };
1010    /// ```
1011    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1012        let devices = self
1013            .client
1014            .olm_machine()
1015            .await
1016            .as_ref()
1017            .ok_or(Error::NoOlmMachine)?
1018            .get_user_devices(user_id, None)
1019            .await?;
1020
1021        Ok(UserDevices { inner: devices, client: self.client.clone() })
1022    }
1023
1024    /// Get the E2EE identity of a user from the crypto store.
1025    ///
1026    /// Usually, we only have the E2EE identity of a user locally if the user
1027    /// is tracked, meaning that we are both members of the same encrypted room.
1028    ///
1029    /// To get the E2EE identity of a user even if it is not available locally
1030    /// use [`Encryption::request_user_identity()`].
1031    ///
1032    /// # Arguments
1033    ///
1034    /// * `user_id` - The unique id of the user that the identity belongs to.
1035    ///
1036    /// Returns a `UserIdentity` if one is found and the crypto store
1037    /// didn't throw an error.
1038    ///
1039    /// This will always return None if the client hasn't been logged in.
1040    ///
1041    /// # Examples
1042    ///
1043    /// ```no_run
1044    /// # use matrix_sdk::{Client, ruma::user_id};
1045    /// # use url::Url;
1046    /// # async {
1047    /// # let alice = user_id!("@alice:example.org");
1048    /// # let homeserver = Url::parse("http://example.com")?;
1049    /// # let client = Client::new(homeserver).await?;
1050    /// let user = client.encryption().get_user_identity(alice).await?;
1051    ///
1052    /// if let Some(user) = user {
1053    ///     println!("{:?}", user.is_verified());
1054    ///
1055    ///     let verification = user.request_verification().await?;
1056    /// }
1057    /// # anyhow::Ok(()) };
1058    /// ```
1059    pub async fn get_user_identity(
1060        &self,
1061        user_id: &UserId,
1062    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1063        let olm = self.client.olm_machine().await;
1064        let Some(olm) = olm.as_ref() else { return Ok(None) };
1065        let identity = olm.get_identity(user_id, None).await?;
1066
1067        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1068    }
1069
1070    /// Get the E2EE identity of a user from the homeserver.
1071    ///
1072    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1073    /// E2EE identity is not found, it should mean that the user did not set
1074    /// up cross-signing.
1075    ///
1076    /// If you want the E2EE identity of a user without making a request to the
1077    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1078    ///
1079    /// # Arguments
1080    ///
1081    /// * `user_id` - The ID of the user that the identity belongs to.
1082    ///
1083    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1084    /// was an issue with the crypto store or with the request to the
1085    /// homeserver.
1086    ///
1087    /// This will always return `None` if the client hasn't been logged in.
1088    ///
1089    /// # Examples
1090    ///
1091    /// ```no_run
1092    /// # use matrix_sdk::{Client, ruma::user_id};
1093    /// # use url::Url;
1094    /// # async {
1095    /// # let alice = user_id!("@alice:example.org");
1096    /// # let homeserver = Url::parse("http://example.com")?;
1097    /// # let client = Client::new(homeserver).await?;
1098    /// let user = client.encryption().request_user_identity(alice).await?;
1099    ///
1100    /// if let Some(user) = user {
1101    ///     println!("User is verified: {:?}", user.is_verified());
1102    ///
1103    ///     let verification = user.request_verification().await?;
1104    /// }
1105    /// # anyhow::Ok(()) };
1106    /// ```
1107    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1108        let olm = self.client.olm_machine().await;
1109        let Some(olm) = olm.as_ref() else { return Ok(None) };
1110
1111        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1112        self.client.keys_query(&request_id, request.device_keys).await?;
1113
1114        let identity = olm.get_identity(user_id, None).await?;
1115        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1116    }
1117
1118    /// Returns a stream of device updates, allowing users to listen for
1119    /// notifications about new or changed devices.
1120    ///
1121    /// The stream produced by this method emits updates whenever a new device
1122    /// is discovered or when an existing device's information is changed. Users
1123    /// can subscribe to this stream and receive updates in real-time.
1124    ///
1125    /// # Examples
1126    ///
1127    /// ```no_run
1128    /// # use matrix_sdk::Client;
1129    /// # use ruma::{device_id, user_id};
1130    /// # use futures_util::{pin_mut, StreamExt};
1131    /// # let client: Client = unimplemented!();
1132    /// # async {
1133    /// let devices_stream = client.encryption().devices_stream().await?;
1134    /// let user_id = client
1135    ///     .user_id()
1136    ///     .expect("We should know our user id after we have logged in");
1137    /// pin_mut!(devices_stream);
1138    ///
1139    /// for device_updates in devices_stream.next().await {
1140    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1141    ///         for device in user_devices.values() {
1142    ///             println!("A new device has been added {}", device.device_id());
1143    ///         }
1144    ///     }
1145    /// }
1146    /// # anyhow::Ok(()) };
1147    /// ```
1148    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1149        let olm = self.client.olm_machine().await;
1150        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1151        let client = self.client.to_owned();
1152
1153        Ok(olm
1154            .store()
1155            .devices_stream()
1156            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1157    }
1158
1159    /// Returns a stream of user identity updates, allowing users to listen for
1160    /// notifications about new or changed user identities.
1161    ///
1162    /// The stream produced by this method emits updates whenever a new user
1163    /// identity is discovered or when an existing identities information is
1164    /// changed. Users can subscribe to this stream and receive updates in
1165    /// real-time.
1166    ///
1167    /// # Examples
1168    ///
1169    /// ```no_run
1170    /// # use matrix_sdk::Client;
1171    /// # use ruma::{device_id, user_id};
1172    /// # use futures_util::{pin_mut, StreamExt};
1173    /// # let client: Client = unimplemented!();
1174    /// # async {
1175    /// let identities_stream =
1176    ///     client.encryption().user_identities_stream().await?;
1177    /// pin_mut!(identities_stream);
1178    ///
1179    /// for identity_updates in identities_stream.next().await {
1180    ///     for (_, identity) in identity_updates.new {
1181    ///         println!("A new identity has been added {}", identity.user_id());
1182    ///     }
1183    /// }
1184    /// # anyhow::Ok(()) };
1185    /// ```
1186    pub async fn user_identities_stream(
1187        &self,
1188    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1189        let olm = self.client.olm_machine().await;
1190        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1191        let client = self.client.to_owned();
1192
1193        Ok(olm
1194            .store()
1195            .user_identities_stream()
1196            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1197    }
1198
1199    /// Create and upload a new cross signing identity.
1200    ///
1201    /// # Arguments
1202    ///
1203    /// * `auth_data` - This request requires user interactive auth, the first
1204    ///   request needs to set this to `None` and will always fail with an
1205    ///   `UiaaResponse`. The response will contain information for the
1206    ///   interactive auth and the same request needs to be made but this time
1207    ///   with some `auth_data` provided.
1208    ///
1209    /// # Examples
1210    ///
1211    /// ```no_run
1212    /// # use std::collections::BTreeMap;
1213    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1214    /// # use url::Url;
1215    /// # use serde_json::json;
1216    /// # async {
1217    /// # let homeserver = Url::parse("http://example.com")?;
1218    /// # let client = Client::new(homeserver).await?;
1219    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1220    ///     if let Some(response) = e.as_uiaa_response() {
1221    ///         let mut password = uiaa::Password::new(
1222    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1223    ///             "wordpass".to_owned(),
1224    ///         );
1225    ///         password.session = response.session.clone();
1226    ///
1227    ///         client
1228    ///             .encryption()
1229    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1230    ///             .await
1231    ///             .expect("Couldn't bootstrap cross signing")
1232    ///     } else {
1233    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1234    ///     }
1235    /// }
1236    /// # anyhow::Ok(()) };
1237    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1238        let olm = self.client.olm_machine().await;
1239        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1240
1241        let CrossSigningBootstrapRequests {
1242            upload_signing_keys_req,
1243            upload_keys_req,
1244            upload_signatures_req,
1245        } = olm.bootstrap_cross_signing(false).await?;
1246
1247        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1248            auth: auth_data,
1249            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1250            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1251            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1252        });
1253
1254        if let Some(req) = upload_keys_req {
1255            self.client.send_outgoing_request(req).await?;
1256        }
1257        self.client.send(upload_signing_keys_req).await?;
1258        self.client.send(upload_signatures_req).await?;
1259
1260        Ok(())
1261    }
1262
1263    /// Reset the cross-signing keys.
1264    ///
1265    /// # Example
1266    ///
1267    /// ```no_run
1268    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1269    /// # use url::Url;
1270    /// # async {
1271    /// # let homeserver = Url::parse("http://example.com")?;
1272    /// # let client = Client::new(homeserver).await?;
1273    /// # let user_id = unimplemented!();
1274    /// let encryption = client.encryption();
1275    ///
1276    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1277    ///     match handle.auth_type() {
1278    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1279    ///             use matrix_sdk::ruma::api::client::uiaa;
1280    ///
1281    ///             let password = "1234".to_owned();
1282    ///             let mut password = uiaa::Password::new(user_id, password);
1283    ///             password.session = uiaa.session;
1284    ///
1285    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1286    ///         }
1287    ///         CrossSigningResetAuthType::OAuth(o) => {
1288    ///             println!(
1289    ///                 "To reset your end-to-end encryption cross-signing identity, \
1290    ///                 you first need to approve it at {}",
1291    ///                 o.approval_url
1292    ///             );
1293    ///             handle.auth(None).await?;
1294    ///         }
1295    ///     }
1296    /// }
1297    /// # anyhow::Ok(()) };
1298    /// ```
1299    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1300        let olm = self.client.olm_machine().await;
1301        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1302
1303        let CrossSigningBootstrapRequests {
1304            upload_keys_req,
1305            upload_signing_keys_req,
1306            upload_signatures_req,
1307        } = olm.bootstrap_cross_signing(true).await?;
1308
1309        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1310            auth: None,
1311            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1312            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1313            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1314        });
1315
1316        if let Some(req) = upload_keys_req {
1317            self.client.send_outgoing_request(req).await?;
1318        }
1319
1320        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1321            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1322                let client = self.client.clone();
1323
1324                Ok(Some(CrossSigningResetHandle::new(
1325                    client,
1326                    upload_signing_keys_req,
1327                    upload_signatures_req,
1328                    auth_type,
1329                )))
1330            } else {
1331                Err(error.into())
1332            }
1333        } else {
1334            self.client.send(upload_signatures_req).await?;
1335
1336            Ok(None)
1337        }
1338    }
1339
1340    /// Query the user's own device keys, if, and only if, we didn't have their
1341    /// identity in the first place.
1342    async fn ensure_initial_key_query(&self) -> Result<()> {
1343        let olm_machine = self.client.olm_machine().await;
1344        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1345
1346        let user_id = olm_machine.user_id();
1347
1348        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1349            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1350            self.client.keys_query(&request_id, request.device_keys).await?;
1351        }
1352
1353        Ok(())
1354    }
1355
1356    /// Create and upload a new cross signing identity, if that has not been
1357    /// done yet.
1358    ///
1359    /// This will only create a new cross-signing identity if the user had never
1360    /// done it before. If the user did it before, then this is a no-op.
1361    ///
1362    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1363    /// behavior of this function.
1364    ///
1365    /// # Arguments
1366    ///
1367    /// * `auth_data` - This request requires user interactive auth, the first
1368    ///   request needs to set this to `None` and will always fail with an
1369    ///   `UiaaResponse`. The response will contain information for the
1370    ///   interactive auth and the same request needs to be made but this time
1371    ///   with some `auth_data` provided.
1372    ///
1373    /// # Examples
1374    /// ```no_run
1375    /// # use std::collections::BTreeMap;
1376    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1377    /// # use url::Url;
1378    /// # use serde_json::json;
1379    /// # async {
1380    /// # let homeserver = Url::parse("http://example.com")?;
1381    /// # let client = Client::new(homeserver).await?;
1382    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1383    ///     if let Some(response) = e.as_uiaa_response() {
1384    ///         let mut password = uiaa::Password::new(
1385    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1386    ///             "wordpass".to_owned(),
1387    ///         );
1388    ///         password.session = response.session.clone();
1389    ///
1390    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1391    ///         // avoid checks.
1392    ///         client
1393    ///             .encryption()
1394    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1395    ///             .await
1396    ///             .expect("Couldn't bootstrap cross signing")
1397    ///     } else {
1398    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1399    ///     }
1400    /// }
1401    /// # anyhow::Ok(()) };
1402    pub async fn bootstrap_cross_signing_if_needed(
1403        &self,
1404        auth_data: Option<AuthData>,
1405    ) -> Result<()> {
1406        let olm_machine = self.client.olm_machine().await;
1407        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1408        let user_id = olm_machine.user_id();
1409
1410        self.ensure_initial_key_query().await?;
1411
1412        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1413            self.bootstrap_cross_signing(auth_data).await?;
1414        }
1415
1416        Ok(())
1417    }
1418
1419    /// Export E2EE keys that match the given predicate encrypting them with the
1420    /// given passphrase.
1421    ///
1422    /// # Arguments
1423    ///
1424    /// * `path` - The file path where the exported key file will be saved.
1425    ///
1426    /// * `passphrase` - The passphrase that will be used to encrypt the
1427    ///   exported room keys.
1428    ///
1429    /// * `predicate` - A closure that will be called for every known
1430    ///   `InboundGroupSession`, which represents a room key. If the closure
1431    ///   returns `true` the `InboundGroupSessoin` will be included in the
1432    ///   export, if the closure returns `false` it will not be included.
1433    ///
1434    /// # Panics
1435    ///
1436    /// This method will panic if it isn't run on a Tokio runtime.
1437    ///
1438    /// This method will panic if it can't get enough randomness from the OS to
1439    /// encrypt the exported keys securely.
1440    ///
1441    /// # Examples
1442    ///
1443    /// ```no_run
1444    /// # use std::{path::PathBuf, time::Duration};
1445    /// # use matrix_sdk::{
1446    /// #     Client, config::SyncSettings,
1447    /// #     ruma::room_id,
1448    /// # };
1449    /// # use url::Url;
1450    /// # async {
1451    /// # let homeserver = Url::parse("http://localhost:8080")?;
1452    /// # let mut client = Client::new(homeserver).await?;
1453    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1454    /// // Export all room keys.
1455    /// client
1456    ///     .encryption()
1457    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1458    ///     .await?;
1459    ///
1460    /// // Export only the room keys for a certain room.
1461    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1462    /// let room_id = room_id!("!test:localhost");
1463    ///
1464    /// client
1465    ///     .encryption()
1466    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1467    ///     .await?;
1468    /// # anyhow::Ok(()) };
1469    /// ```
1470    #[cfg(not(target_family = "wasm"))]
1471    pub async fn export_room_keys(
1472        &self,
1473        path: PathBuf,
1474        passphrase: &str,
1475        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1476    ) -> Result<()> {
1477        let olm = self.client.olm_machine().await;
1478        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1479
1480        let keys = olm.store().export_room_keys(predicate).await?;
1481        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1482
1483        let encrypt = move || -> Result<()> {
1484            let export: String =
1485                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1486            let mut file = std::fs::File::create(path)?;
1487            file.write_all(&export.into_bytes())?;
1488            Ok(())
1489        };
1490
1491        let task = tokio::task::spawn_blocking(encrypt);
1492        task.await.expect("Task join error")
1493    }
1494
1495    /// Import E2EE keys from the given file path.
1496    ///
1497    /// # Arguments
1498    ///
1499    /// * `path` - The file path where the exported key file will can be found.
1500    ///
1501    /// * `passphrase` - The passphrase that should be used to decrypt the
1502    ///   exported room keys.
1503    ///
1504    /// Returns a tuple of numbers that represent the number of sessions that
1505    /// were imported and the total number of sessions that were found in the
1506    /// key export.
1507    ///
1508    /// # Panics
1509    ///
1510    /// This method will panic if it isn't run on a Tokio runtime.
1511    ///
1512    /// ```no_run
1513    /// # use std::{path::PathBuf, time::Duration};
1514    /// # use matrix_sdk::{
1515    /// #     Client, config::SyncSettings,
1516    /// #     ruma::room_id,
1517    /// # };
1518    /// # use url::Url;
1519    /// # async {
1520    /// # let homeserver = Url::parse("http://localhost:8080")?;
1521    /// # let mut client = Client::new(homeserver).await?;
1522    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1523    /// let result =
1524    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1525    ///
1526    /// println!(
1527    ///     "Imported {} room keys out of {}",
1528    ///     result.imported_count, result.total_count
1529    /// );
1530    /// # anyhow::Ok(()) };
1531    /// ```
1532    #[cfg(not(target_family = "wasm"))]
1533    pub async fn import_room_keys(
1534        &self,
1535        path: PathBuf,
1536        passphrase: &str,
1537    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1538        let olm = self.client.olm_machine().await;
1539        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1540        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1541
1542        let decrypt = move || {
1543            let file = std::fs::File::open(path)?;
1544            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1545        };
1546
1547        let task = tokio::task::spawn_blocking(decrypt);
1548        let import = task.await.expect("Task join error")?;
1549
1550        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1551
1552        self.backups().maybe_trigger_backup();
1553
1554        Ok(ret)
1555    }
1556
1557    /// Receive notifications of room keys being received as a [`Stream`].
1558    ///
1559    /// Each time a room key is updated in any way, an update will be sent to
1560    /// the stream. Updates that happen at the same time are batched into a
1561    /// [`Vec`].
1562    ///
1563    /// If the reader of the stream lags too far behind, an error is broadcast
1564    /// containing the number of skipped items.
1565    ///
1566    /// # Examples
1567    ///
1568    /// ```no_run
1569    /// # use matrix_sdk::Client;
1570    /// # use url::Url;
1571    /// # async {
1572    /// # let homeserver = Url::parse("http://example.com")?;
1573    /// # let client = Client::new(homeserver).await?;
1574    /// use futures_util::StreamExt;
1575    ///
1576    /// let Some(mut room_keys_stream) =
1577    ///     client.encryption().room_keys_received_stream().await
1578    /// else {
1579    ///     return Ok(());
1580    /// };
1581    ///
1582    /// while let Some(update) = room_keys_stream.next().await {
1583    ///     println!("Received room keys {update:?}");
1584    /// }
1585    /// # anyhow::Ok(()) };
1586    /// ```
1587    pub async fn room_keys_received_stream(
1588        &self,
1589    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1590    {
1591        let olm = self.client.olm_machine().await;
1592        let olm = olm.as_ref()?;
1593
1594        Some(olm.store().room_keys_received_stream())
1595    }
1596
1597    /// Receive notifications of historic room key bundles as a [`Stream`].
1598    ///
1599    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1600    ///
1601    /// Each time a historic room key bundle was received, an update will be
1602    /// sent to the stream. This stream is useful for informative purposes
1603    /// exclusively, historic room key bundles are handled by the SDK
1604    /// automatically.
1605    ///
1606    /// # Examples
1607    ///
1608    /// ```no_run
1609    /// # use matrix_sdk::Client;
1610    /// # use url::Url;
1611    /// # async {
1612    /// # let homeserver = Url::parse("http://example.com")?;
1613    /// # let client = Client::new(homeserver).await?;
1614    /// use futures_util::StreamExt;
1615    ///
1616    /// let Some(mut bundle_stream) =
1617    ///     client.encryption().historic_room_key_stream().await
1618    /// else {
1619    ///     return Ok(());
1620    /// };
1621    ///
1622    /// while let Some(bundle_info) = bundle_stream.next().await {
1623    ///     println!("Received a historic room key bundle {bundle_info:?}");
1624    /// }
1625    /// # anyhow::Ok(()) };
1626    /// ```
1627    pub async fn historic_room_key_stream(
1628        &self,
1629    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1630        let olm = self.client.olm_machine().await;
1631        let olm = olm.as_ref()?;
1632
1633        Some(olm.store().historic_room_key_stream())
1634    }
1635
1636    /// Get the secret storage manager of the client.
1637    pub fn secret_storage(&self) -> SecretStorage {
1638        SecretStorage { client: self.client.to_owned() }
1639    }
1640
1641    /// Get the backups manager of the client.
1642    pub fn backups(&self) -> Backups {
1643        Backups { client: self.client.to_owned() }
1644    }
1645
1646    /// Get the recovery manager of the client.
1647    pub fn recovery(&self) -> Recovery {
1648        Recovery { client: self.client.to_owned() }
1649    }
1650
1651    /// Enables the crypto-store cross-process lock.
1652    ///
1653    /// This may be required if there are multiple processes that may do writes
1654    /// to the same crypto store. In that case, it's necessary to create a
1655    /// lock, so that only one process writes to it, otherwise this may
1656    /// cause confusing issues because of stale data contained in in-memory
1657    /// caches.
1658    ///
1659    /// The provided `lock_value` must be a unique identifier for this process.
1660    /// Check [`Client::cross_process_store_locks_holder_name`] to
1661    /// get the global value.
1662    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1663        // If the lock has already been created, don't recreate it from scratch.
1664        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1665            let prev_holder = prev_lock.lock_holder();
1666            if prev_holder == lock_value {
1667                return Ok(());
1668            }
1669            warn!(
1670                "Recreating cross-process store lock with a different holder value: \
1671                 prev was {prev_holder}, new is {lock_value}"
1672            );
1673        }
1674
1675        let olm_machine = self.client.base_client().olm_machine().await;
1676        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1677
1678        let lock =
1679            olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1680
1681        // Gently try to initialize the crypto store generation counter.
1682        //
1683        // If we don't get the lock immediately, then it is already acquired by another
1684        // process, and we'll get to reload next time we acquire the lock.
1685        {
1686            let lock_result = lock.try_lock_once().await?;
1687
1688            if lock_result.is_ok() {
1689                olm_machine
1690                    .initialize_crypto_store_generation(
1691                        &self.client.locks().crypto_store_generation,
1692                    )
1693                    .await?;
1694            }
1695        }
1696
1697        self.client
1698            .locks()
1699            .cross_process_crypto_store_lock
1700            .set(lock)
1701            .map_err(|_| Error::BadCryptoStoreState)?;
1702
1703        Ok(())
1704    }
1705
1706    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1707    /// time.
1708    ///
1709    /// Returns the current generation number.
1710    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1711        let olm_machine_guard = self.client.olm_machine().await;
1712        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1713            let (new_gen, generation_number) = olm_machine
1714                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1715                .await?;
1716            // If the crypto store generation has changed,
1717            if new_gen {
1718                // (get rid of the reference to the current crypto store first)
1719                drop(olm_machine_guard);
1720                // Recreate the OlmMachine.
1721                self.client.base_client().regenerate_olm(None).await?;
1722            }
1723            Ok(generation_number)
1724        } else {
1725            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1726            // been initialised by the time we get here. Ideally we'd panic, or return an
1727            // error, but for now I'm just adding some logging to check if it
1728            // happens, and returning the magic number 0.
1729            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1730            Ok(0)
1731        }
1732    }
1733
1734    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1735    /// spin-waits until the lock is available.
1736    ///
1737    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1738    /// first time.
1739    pub async fn spin_lock_store(
1740        &self,
1741        max_backoff: Option<u32>,
1742    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1743        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1744            let guard = lock
1745                .spin_lock(max_backoff)
1746                .await
1747                .map_err(|err| {
1748                    Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(
1749                        Box::new(err),
1750                    )))
1751                })?
1752                .map_err(|err| Error::CrossProcessLockError(Box::new(err.into())))?;
1753
1754            let generation = self.on_lock_newly_acquired().await?;
1755
1756            Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1757                _guard: guard.into_guard(),
1758                generation,
1759            }))
1760        } else {
1761            Ok(None)
1762        }
1763    }
1764
1765    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1766    /// attempts to lock it once.
1767    ///
1768    /// Returns a guard to the lock, if it was obtained.
1769    pub async fn try_lock_store_once(
1770        &self,
1771    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1772        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1773            let lock_result = lock.try_lock_once().await?;
1774
1775            let Some(guard) = lock_result.ok() else {
1776                return Ok(None);
1777            };
1778
1779            let generation = self.on_lock_newly_acquired().await?;
1780
1781            Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1782                _guard: guard.into_guard(),
1783                generation,
1784            }))
1785        } else {
1786            Ok(None)
1787        }
1788    }
1789
1790    /// Testing purposes only.
1791    #[cfg(any(test, feature = "testing"))]
1792    pub async fn uploaded_key_count(&self) -> Result<u64> {
1793        let olm_machine = self.client.olm_machine().await;
1794        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1795        Ok(olm_machine.uploaded_key_count().await?)
1796    }
1797
1798    /// Bootstrap encryption and enables event listeners for the E2EE support.
1799    ///
1800    /// Based on the `EncryptionSettings`, this call might:
1801    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1802    /// - Create a key backup if needed (POST `/room_keys/version`)
1803    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1804    ///
1805    /// As part of this process, and if needed, the current device keys would be
1806    /// uploaded to the server, new account data would be added, and cross
1807    /// signing keys and signatures might be uploaded.
1808    ///
1809    /// Should be called once we
1810    /// created a [`OlmMachine`], i.e. after logging in.
1811    ///
1812    /// # Arguments
1813    ///
1814    /// * `auth_data` - Some requests may require re-authentication. To prevent
1815    ///   the user from having to re-enter their password (or use other
1816    ///   methods), we can provide the authentication data here. This is
1817    ///   necessary for uploading cross-signing keys. However, please note that
1818    ///   there is a proposal (MSC3967) to remove this requirement, which would
1819    ///   allow for the initial upload of cross-signing keys without
1820    ///   authentication, rendering this parameter obsolete.
1821    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1822        // It's fine to be async here as we're only getting the lock protecting the
1823        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1824        // in we won't delay the login or restoration of the Client.
1825        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1826            Some(BundleReceiverTask::new(&self.client).await)
1827        } else {
1828            None
1829        };
1830
1831        let mut tasks = self.client.inner.e2ee.tasks.lock();
1832
1833        let this = self.clone();
1834
1835        tasks.setup_e2ee = Some(spawn(async move {
1836            // Update the current state first, so we don't have to wait for the result of
1837            // network requests
1838            this.update_verification_state().await;
1839
1840            if this.settings().auto_enable_cross_signing
1841                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1842            {
1843                error!("Couldn't bootstrap cross signing {e:?}");
1844            }
1845
1846            if let Err(e) = this.backups().setup_and_resume().await {
1847                error!("Couldn't setup and resume backups {e:?}");
1848            }
1849            if let Err(e) = this.recovery().setup().await {
1850                error!("Couldn't setup and resume recovery {e:?}");
1851            }
1852        }));
1853
1854        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1855    }
1856
1857    /// Waits for end-to-end encryption initialization tasks to finish, if any
1858    /// was running in the background.
1859    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1860        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1861
1862        if let Some(task) = task
1863            && let Err(err) = task.await
1864        {
1865            warn!("Error when initializing backups: {err}");
1866        }
1867    }
1868
1869    /// Upload the device keys and initial set of one-time keys to the server.
1870    ///
1871    /// This should only be called when the user logs in for the first time,
1872    /// the method will ensure that other devices see our own device as an
1873    /// end-to-end encryption enabled one.
1874    ///
1875    /// **Warning**: Do not use this method if we're already calling
1876    /// [`Client::send_outgoing_request()`]. This method is intended for
1877    /// explicitly uploading the device keys before starting a sync.
1878    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1879        let olm = self.client.olm_machine().await;
1880        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1881
1882        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1883            self.client.keys_upload(&request_id, &request).await?;
1884
1885            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1886            self.client.keys_query(&request_id, request.device_keys).await?;
1887        }
1888
1889        Ok(())
1890    }
1891
1892    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1893        self.recovery().update_state_after_keys_query(response).await;
1894
1895        // Only update the verification_state if our own devices changed
1896        if let Some(user_id) = self.client.user_id() {
1897            let contains_own_device = response.device_keys.contains_key(user_id);
1898
1899            if contains_own_device {
1900                self.update_verification_state().await;
1901            }
1902        }
1903    }
1904
1905    async fn update_verification_state(&self) {
1906        match self.get_own_device().await {
1907            Ok(device) => {
1908                if let Some(device) = device {
1909                    let is_verified = device.is_cross_signed_by_owner();
1910
1911                    if is_verified {
1912                        self.client.inner.verification_state.set(VerificationState::Verified);
1913                    } else {
1914                        self.client.inner.verification_state.set(VerificationState::Unverified);
1915                    }
1916                } else {
1917                    warn!("Couldn't find out own device in the store.");
1918                    self.client.inner.verification_state.set(VerificationState::Unknown);
1919                }
1920            }
1921            Err(error) => {
1922                warn!("Failed retrieving own device: {error}");
1923                self.client.inner.verification_state.set(VerificationState::Unknown);
1924            }
1925        }
1926    }
1927
1928    /// Encrypts then send the given content via the `/sendToDevice` end-point
1929    /// using Olm encryption.
1930    ///
1931    /// If there are a lot of recipient devices multiple `/sendToDevice`
1932    /// requests might be sent out.
1933    ///
1934    /// # Returns
1935    /// A list of failures. The list of devices that couldn't get the messages.
1936    #[cfg(feature = "experimental-send-custom-to-device")]
1937    pub async fn encrypt_and_send_raw_to_device(
1938        &self,
1939        recipient_devices: Vec<&Device>,
1940        event_type: &str,
1941        content: Raw<AnyToDeviceEventContent>,
1942        share_strategy: CollectStrategy,
1943    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1944        let users = recipient_devices.iter().map(|device| device.user_id());
1945
1946        // Will claim one-time-key for users that needs it
1947        // TODO: For later optimisation: This will establish missing olm sessions with
1948        // all this users devices, but we just want for some devices.
1949        self.client.claim_one_time_keys(users).await?;
1950
1951        let olm = self.client.olm_machine().await;
1952        let olm = olm.as_ref().expect("Olm machine wasn't started");
1953
1954        let (requests, withhelds) = olm
1955            .encrypt_content_for_devices(
1956                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1957                event_type,
1958                &content
1959                    .deserialize_as::<serde_json::Value>()
1960                    .expect("Deserialize as Value will always work"),
1961                share_strategy,
1962            )
1963            .await?;
1964
1965        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1966
1967        // Push the withhelds in the failures
1968        withhelds.iter().for_each(|(d, _)| {
1969            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1970        });
1971
1972        // TODO: parallelize that? it's already grouping 250 devices per chunk.
1973        for request in requests {
1974            let ruma_request = RumaToDeviceRequest::new_raw(
1975                request.event_type.clone(),
1976                request.txn_id.clone(),
1977                request.messages.clone(),
1978            );
1979
1980            let send_result = self
1981                .client
1982                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1983                .await;
1984
1985            // If the sending failed we need to collect the failures to report them
1986            if send_result.is_err() {
1987                // Mark the sending as failed
1988                for (user_id, device_map) in request.messages {
1989                    for device_id in device_map.keys() {
1990                        match device_id {
1991                            DeviceIdOrAllDevices::DeviceId(device_id) => {
1992                                failures.push((user_id.clone(), device_id.to_owned()));
1993                            }
1994                            DeviceIdOrAllDevices::AllDevices => {
1995                                // Cannot happen in this case
1996                            }
1997                        }
1998                    }
1999                }
2000            }
2001        }
2002
2003        Ok(failures)
2004    }
2005}
2006
2007#[cfg(all(test, not(target_family = "wasm")))]
2008mod tests {
2009    use std::{
2010        ops::Not,
2011        str::FromStr,
2012        sync::{
2013            Arc,
2014            atomic::{AtomicBool, Ordering},
2015        },
2016        time::Duration,
2017    };
2018
2019    use matrix_sdk_test::{
2020        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
2021        test_json,
2022    };
2023    use ruma::{
2024        event_id,
2025        events::{reaction::ReactionEventContent, relation::Annotation},
2026    };
2027    use serde_json::json;
2028    use wiremock::{
2029        Mock, MockServer, Request, ResponseTemplate,
2030        matchers::{header, method, path_regex},
2031    };
2032
2033    use crate::{
2034        Client, assert_next_matches_with_timeout,
2035        config::RequestConfig,
2036        encryption::{
2037            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2038        },
2039        test_utils::{
2040            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2041        },
2042    };
2043
2044    #[async_test]
2045    async fn test_reaction_sending() {
2046        let server = MockServer::start().await;
2047        let client = logged_in_client(Some(server.uri())).await;
2048
2049        let event_id = event_id!("$2:example.org");
2050
2051        Mock::given(method("GET"))
2052            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2053            .and(header("authorization", "Bearer 1234"))
2054            .respond_with(
2055                ResponseTemplate::new(200)
2056                    .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
2057            )
2058            .mount(&server)
2059            .await;
2060
2061        Mock::given(method("PUT"))
2062            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2063            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2064                "event_id": event_id,
2065            })))
2066            .mount(&server)
2067            .await;
2068
2069        let response = SyncResponseBuilder::default()
2070            .add_joined_room(
2071                JoinedRoomBuilder::default()
2072                    .add_state_event(StateTestEvent::Member)
2073                    .add_state_event(StateTestEvent::PowerLevels)
2074                    .add_state_event(StateTestEvent::Encryption),
2075            )
2076            .build_sync_response();
2077
2078        client.base_client().receive_sync_response(response).await.unwrap();
2079
2080        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2081        assert!(
2082            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2083        );
2084
2085        let event_id = event_id!("$1:example.org");
2086        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2087        room.send(reaction).await.expect("Sending the reaction should not fail");
2088
2089        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2090    }
2091
2092    #[cfg(feature = "sqlite")]
2093    #[async_test]
2094    async fn test_generation_counter_invalidates_olm_machine() {
2095        // Create two clients using the same sqlite database.
2096
2097        use matrix_sdk_base::store::RoomLoadSettings;
2098        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2099        let session = mock_matrix_session();
2100
2101        let client1 = Client::builder()
2102            .homeserver_url("http://localhost:1234")
2103            .request_config(RequestConfig::new().disable_retry())
2104            .sqlite_store(&sqlite_path, None)
2105            .build()
2106            .await
2107            .unwrap();
2108        client1
2109            .matrix_auth()
2110            .restore_session(session.clone(), RoomLoadSettings::default())
2111            .await
2112            .unwrap();
2113
2114        let client2 = Client::builder()
2115            .homeserver_url("http://localhost:1234")
2116            .request_config(RequestConfig::new().disable_retry())
2117            .sqlite_store(sqlite_path, None)
2118            .build()
2119            .await
2120            .unwrap();
2121        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2122
2123        // When the lock isn't enabled, any attempt at locking won't return a guard.
2124        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2125        assert!(guard.is_none());
2126
2127        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2128        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2129
2130        // One client can take the lock.
2131        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2132        assert!(acquired1.is_some());
2133
2134        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2135        let initial_olm_machine =
2136            client1.olm_machine().await.clone().expect("must have an olm machine");
2137
2138        // Also enable backup to check that new machine has the same backup keys.
2139        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2140            .expect("Can't create new recovery key");
2141        let backup_key = decryption_key.megolm_v1_public_key();
2142        backup_key.set_version("1".to_owned());
2143        initial_olm_machine
2144            .backup_machine()
2145            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2146            .await
2147            .expect("Should save");
2148
2149        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2150
2151        assert!(client1.encryption().backups().are_enabled().await);
2152
2153        // The other client can't take the lock too.
2154        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2155        assert!(acquired2.is_none());
2156
2157        // Now have the first client release the lock,
2158        drop(acquired1);
2159        tokio::time::sleep(Duration::from_millis(100)).await;
2160
2161        // And re-take it.
2162        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2163        assert!(acquired1.is_some());
2164
2165        // In that case, the Olm Machine shouldn't change.
2166        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2167        assert!(initial_olm_machine.same_as(&olm_machine));
2168
2169        // Ok, release again.
2170        drop(acquired1);
2171        tokio::time::sleep(Duration::from_millis(100)).await;
2172
2173        // Client2 can acquire the lock.
2174        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2175        assert!(acquired2.is_some());
2176
2177        // And then release it.
2178        drop(acquired2);
2179        tokio::time::sleep(Duration::from_millis(100)).await;
2180
2181        // Client1 can acquire it again,
2182        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2183        assert!(acquired1.is_some());
2184
2185        // But now its olm machine has been invalidated and thus regenerated!
2186        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2187
2188        assert!(!initial_olm_machine.same_as(&olm_machine));
2189
2190        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2191        assert!(backup_key_new.decryption_key.is_some());
2192        assert_eq!(
2193            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2194            backup_key.to_base64()
2195        );
2196        assert!(client1.encryption().backups().are_enabled().await);
2197    }
2198
2199    #[cfg(feature = "sqlite")]
2200    #[async_test]
2201    async fn test_generation_counter_no_spurious_invalidation() {
2202        // Create two clients using the same sqlite database.
2203
2204        use matrix_sdk_base::store::RoomLoadSettings;
2205        let sqlite_path =
2206            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2207        let session = mock_matrix_session();
2208
2209        let client = Client::builder()
2210            .homeserver_url("http://localhost:1234")
2211            .request_config(RequestConfig::new().disable_retry())
2212            .sqlite_store(&sqlite_path, None)
2213            .build()
2214            .await
2215            .unwrap();
2216        client
2217            .matrix_auth()
2218            .restore_session(session.clone(), RoomLoadSettings::default())
2219            .await
2220            .unwrap();
2221
2222        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2223
2224        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2225
2226        // Enabling the lock doesn't update the olm machine.
2227        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2228        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2229
2230        {
2231            // Simulate that another client hold the lock before.
2232            let client2 = Client::builder()
2233                .homeserver_url("http://localhost:1234")
2234                .request_config(RequestConfig::new().disable_retry())
2235                .sqlite_store(sqlite_path, None)
2236                .build()
2237                .await
2238                .unwrap();
2239            client2
2240                .matrix_auth()
2241                .restore_session(session, RoomLoadSettings::default())
2242                .await
2243                .unwrap();
2244
2245            client2
2246                .encryption()
2247                .enable_cross_process_store_lock("client2".to_owned())
2248                .await
2249                .unwrap();
2250
2251            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2252            assert!(guard.is_some());
2253
2254            drop(guard);
2255            tokio::time::sleep(Duration::from_millis(100)).await;
2256        }
2257
2258        {
2259            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2260            assert!(acquired.is_some());
2261        }
2262
2263        // Taking the lock the first time will update the olm machine.
2264        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2265        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2266
2267        {
2268            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2269            assert!(acquired.is_some());
2270        }
2271
2272        // Re-taking the lock doesn't update the olm machine.
2273        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2274        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2275    }
2276
2277    #[async_test]
2278    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2279        // Given a client and a server
2280        let client = no_retry_test_client(None).await;
2281        let server = MockServer::start().await;
2282
2283        // When we subscribe to its verification state
2284        let mut verification_state = client.encryption().verification_state();
2285
2286        // We can get its initial value, and it's Unknown
2287        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2288
2289        // We set up a mocked request to check this endpoint is not called before
2290        // reading the new state
2291        let keys_requested = Arc::new(AtomicBool::new(false));
2292        let inner_bool = keys_requested.clone();
2293
2294        Mock::given(method("GET"))
2295            .and(path_regex(
2296                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2297            ))
2298            .respond_with(move |_req: &Request| {
2299                inner_bool.fetch_or(true, Ordering::SeqCst);
2300                ResponseTemplate::new(200).set_body_json(json!({}))
2301            })
2302            .mount(&server)
2303            .await;
2304
2305        // When the session is initialised and the encryption tasks spawn
2306        set_client_session(&client).await;
2307
2308        // Then we can get an updated value without waiting for any network requests
2309        assert!(keys_requested.load(Ordering::SeqCst).not());
2310        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2311    }
2312
2313    #[test]
2314    fn test_oauth_reset_info_from_uiaa_info() {
2315        let auth_info = json!({
2316            "session": "dummy",
2317            "flows": [
2318                {
2319                    "stages": [
2320                        "org.matrix.cross_signing_reset"
2321                    ]
2322                }
2323            ],
2324            "params": {
2325                "org.matrix.cross_signing_reset": {
2326                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2327                }
2328            },
2329            "msg": "To reset..."
2330        });
2331
2332        let auth_info = serde_json::from_value(auth_info)
2333            .expect("We should be able to deserialize the UiaaInfo");
2334        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2335            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2336    }
2337
2338    #[test]
2339    fn test_duplicate_one_time_key_error_parsing() {
2340        let message = concat!(
2341            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2342            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2343            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2344            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2345            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2346            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2347            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2348        );
2349        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2350            .expect("We should be able to parse the error message");
2351
2352        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2353        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2354
2355        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2356            .expect_err("We shouldn't be able to parse an incomplete error message");
2357    }
2358
2359    // Helper function for the test_devices_to_verify_against_* tests.  Make a
2360    // response to a /keys/query request using the given device keys and a
2361    // pre-defined set of cross-signing keys.
2362    fn devices_to_verify_against_keys_query_response(
2363        devices: Vec<serde_json::Value>,
2364    ) -> serde_json::Value {
2365        let device_keys: serde_json::Map<String, serde_json::Value> = devices
2366            .into_iter()
2367            .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2368            .collect();
2369        json!({
2370            "device_keys": {
2371                "@example:localhost": device_keys,
2372            },
2373            "master_keys": {
2374                "@example:localhost": {
2375                    "keys": {
2376                        "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2377                    },
2378                    "usage": ["master"],
2379                    "user_id": "@example:localhost",
2380                },
2381            },
2382            "self_signing_keys": {
2383                "@example:localhost": {
2384                    "keys": {
2385                        "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2386                    },
2387                    "usage": ["self_signing"],
2388                    "user_id": "@example:localhost",
2389                    "signatures": {
2390                        "@example:localhost": {
2391                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2392                        },
2393                    },
2394                },
2395            },
2396            "user_signing_keys": {
2397                "@example:localhost": {
2398                    "keys": {
2399                        "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2400                    },
2401                    "usage": ["user_signing"],
2402                    "user_id": "@example:localhost",
2403                    "signatures": {
2404                        "@example:localhost": {
2405                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2406                        },
2407                    },
2408                },
2409            }
2410        })
2411    }
2412
2413    // The following three tests test that we can detect whether the user has
2414    // other devices that they can verify against under different conditions.
2415    #[async_test]
2416    /// Test that we detect that can't verify against another device if we have
2417    /// no devices.
2418    async fn test_devices_to_verify_against_no_devices() {
2419        let server = MockServer::start().await;
2420        let client = logged_in_client(Some(server.uri())).await;
2421
2422        Mock::given(method("POST"))
2423            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2424            .respond_with(
2425                ResponseTemplate::new(200)
2426                    .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2427            )
2428            .mount(&server)
2429            .await;
2430
2431        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2432    }
2433
2434    #[async_test]
2435    /// Test that we detect that we can verify against another cross-signed
2436    /// regular device.
2437    async fn test_devices_to_verify_against_cross_signed() {
2438        let server = MockServer::start().await;
2439        let client = logged_in_client(Some(server.uri())).await;
2440
2441        Mock::given(method("POST"))
2442            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2443            .respond_with(ResponseTemplate::new(200).set_body_json(
2444                devices_to_verify_against_keys_query_response(vec![
2445                    json!({
2446                        "algorithms": [
2447                            "m.olm.v1.curve25519-aes-sha2",
2448                            "m.megolm.v1.aes-sha2",
2449                        ],
2450                        "user_id": "@example:localhost",
2451                        "device_id": "SIGNEDDEVICE",
2452                        "keys": {
2453                            "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2454                            "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2455                        },
2456                        "signatures": {
2457                            "@example:localhost": {
2458                                "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2459                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2460                            },
2461                        },
2462                    })
2463                ])
2464            ))
2465            .mount(&server)
2466            .await;
2467
2468        assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2469    }
2470
2471    #[async_test]
2472    /// Test that we detect that we can't verify against a dehydrated or
2473    /// unsigned device.
2474    async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2475        let server = MockServer::start().await;
2476        let client = logged_in_client(Some(server.uri())).await;
2477        let user_id = client.user_id().unwrap();
2478        let olm_machine = client.olm_machine().await;
2479        let olm_machine = olm_machine.as_ref().unwrap();
2480
2481        Mock::given(method("POST"))
2482            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2483            .respond_with(ResponseTemplate::new(200).set_body_json(
2484                devices_to_verify_against_keys_query_response(vec![
2485                    json!({
2486                        "algorithms": [
2487                            "m.olm.v1.curve25519-aes-sha2",
2488                            "m.megolm.v1.aes-sha2",
2489                        ],
2490                        "user_id": "@example:localhost",
2491                        "device_id": "DEHYDRATEDDEVICE",
2492                        "keys": {
2493                            "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2494                            "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2495                        },
2496                        "dehydrated": true,
2497                        "signatures": {
2498                            "@example:localhost": {
2499                                "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2500                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2501                            },
2502                        },
2503                    }),
2504                    json!({
2505                        "algorithms": [
2506                            "m.olm.v1.curve25519-aes-sha2",
2507                            "m.megolm.v1.aes-sha2",
2508                        ],
2509                        "user_id": "@example:localhost",
2510                        "device_id": "UNSIGNEDDEVICE",
2511                        "keys": {
2512                            "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2513                            "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2514                        },
2515                        "signatures": {
2516                            "@example:localhost": {
2517                                "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2518                            },
2519                        },
2520                    }),
2521                ])
2522            ))
2523            .mount(&server)
2524            .await;
2525
2526        let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2527        client.keys_query(&request_id, request.device_keys).await.unwrap();
2528
2529        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2530    }
2531}