Skip to main content

matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::PathBuf,
26    str::FromStr,
27    sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33    future::try_join,
34    stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39    StateStoreDataKey, StateStoreDataValue,
40    cross_process_lock::CrossProcessLockError,
41    crypto::{
42        CrossSigningBootstrapRequests, OlmMachine,
43        store::types::{RoomKeyBundleInfo, RoomKeyInfo},
44        types::{
45            SignedKey,
46            requests::{
47                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
48            },
49        },
50    },
51};
52use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
53use ruma::{
54    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
55    api::client::{
56        error::{ErrorBody, StandardErrorBody},
57        keys::{
58            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
59            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
60        },
61        message::send_message_event,
62        to_device::send_event_to_device::v3::{
63            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
64        },
65        uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
66    },
67    assign,
68    events::room::{MediaSource, ThumbnailInfo},
69};
70#[cfg(feature = "experimental-send-custom-to-device")]
71use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
72use serde::{Deserialize, de::Error as _};
73use tasks::BundleReceiverTask;
74use tokio::sync::{Mutex, RwLockReadGuard};
75use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
76use tracing::{debug, error, instrument, warn};
77use url::Url;
78use vodozemac::Curve25519PublicKey;
79
80use self::{
81    backups::{Backups, types::BackupClientState},
82    futures::UploadEncryptedFile,
83    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
84    recovery::{Recovery, RecoveryState},
85    secret_storage::SecretStorage,
86    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
87    verification::{SasVerification, Verification, VerificationRequest},
88};
89use crate::{
90    Client, Error, HttpError, Result, RumaApiError, TransmissionProgress,
91    attachment::Thumbnail,
92    client::{ClientInner, WeakClient},
93    cross_process_lock::CrossProcessLockGuard,
94    error::HttpResult,
95};
96
97pub mod backups;
98pub mod futures;
99pub mod identities;
100pub mod recovery;
101pub mod secret_storage;
102pub(crate) mod tasks;
103pub mod verification;
104
105pub use matrix_sdk_base::crypto::{
106    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
107    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
108    SessionCreationError, SignatureError, VERSION,
109    olm::{
110        SessionCreationError as MegolmSessionCreationError,
111        SessionExportError as OlmSessionExportError,
112    },
113    vodozemac,
114};
115use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
116
117#[cfg(feature = "experimental-send-custom-to-device")]
118use crate::config::RequestConfig;
119pub use crate::error::RoomKeyImportError;
120
121/// All the data related to the encryption state.
122pub(crate) struct EncryptionData {
123    /// Background tasks related to encryption (key backup, initialization
124    /// tasks, etc.).
125    pub tasks: StdMutex<ClientTasks>,
126
127    /// End-to-end encryption settings.
128    pub encryption_settings: EncryptionSettings,
129
130    /// All state related to key backup.
131    pub backup_state: BackupClientState,
132
133    /// All state related to secret storage recovery.
134    pub recovery_state: SharedObservable<RecoveryState>,
135}
136
137impl EncryptionData {
138    pub fn new(encryption_settings: EncryptionSettings) -> Self {
139        Self {
140            encryption_settings,
141
142            tasks: StdMutex::new(Default::default()),
143            backup_state: Default::default(),
144            recovery_state: Default::default(),
145        }
146    }
147
148    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
149        let weak_client = WeakClient::from_inner(client);
150
151        let mut tasks = self.tasks.lock();
152        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
153
154        if self.encryption_settings.backup_download_strategy
155            == BackupDownloadStrategy::AfterDecryptionFailure
156        {
157            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
158        }
159    }
160
161    /// Initialize the background task which listens for changes in the
162    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
163    ///
164    /// This should happen after the usual tasks have been set up and after the
165    /// E2EE initialization tasks have been set up.
166    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
167        let mut guard = self.tasks.lock();
168
169        let future = Recovery::update_state_after_backup_state_change(client);
170        let join_handle = spawn(future);
171
172        guard.update_recovery_state_after_backup = Some(join_handle);
173    }
174}
175
176/// Settings for end-to-end encryption features.
177#[derive(Clone, Copy, Debug, Default)]
178pub struct EncryptionSettings {
179    /// Automatically bootstrap cross-signing for a user once they're logged, in
180    /// case it's not already done yet.
181    ///
182    /// This requires to login with a username and password, or that MSC3967 is
183    /// enabled on the server, as of 2023-10-20.
184    pub auto_enable_cross_signing: bool,
185
186    /// Select a strategy to download room keys from the backup, by default room
187    /// keys won't be downloaded from the backup automatically.
188    ///
189    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
190    pub backup_download_strategy: BackupDownloadStrategy,
191
192    /// Automatically create a backup version if no backup exists.
193    pub auto_enable_backups: bool,
194}
195
196/// Settings for end-to-end encryption features.
197#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
198#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
199pub enum BackupDownloadStrategy {
200    /// Automatically download all room keys from the backup when the backup
201    /// recovery key has been received. The backup recovery key can be received
202    /// in two ways:
203    ///
204    /// 1. Received as a `m.secret.send` to-device event, after a successful
205    ///    interactive verification.
206    /// 2. Imported from secret storage (4S) using the
207    ///    [`SecretStore::import_secrets()`] method.
208    ///
209    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
210    OneShot,
211
212    /// Attempt to download a single room key if an event fails to be decrypted.
213    AfterDecryptionFailure,
214
215    /// Don't download any room keys automatically. The user can manually
216    /// download room keys using the [`Backups::download_room_key()`] methods.
217    ///
218    /// This is the default option.
219    #[default]
220    Manual,
221}
222
223/// The verification state of our own device
224///
225/// This enum tells us if our own user identity trusts these devices, in other
226/// words it tells us if the user identity has signed the device.
227#[derive(Clone, Copy, Debug, Eq, PartialEq)]
228pub enum VerificationState {
229    /// The verification state is unknown for now.
230    Unknown,
231    /// The device is considered to be verified, it has been signed by its user
232    /// identity.
233    Verified,
234    /// The device is unverified.
235    Unverified,
236}
237
238/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
239#[derive(Debug)]
240pub struct CrossProcessLockStoreGuardWithGeneration {
241    _guard: CrossProcessLockGuard,
242    generation: u64,
243}
244
245impl CrossProcessLockStoreGuardWithGeneration {
246    /// Return the Crypto Store generation associated with this store lock.
247    pub fn generation(&self) -> u64 {
248        self.generation
249    }
250}
251
252/// A stateful struct remembering the cross-signing keys we need to upload.
253///
254/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
255/// additional authentication, this struct will contain information on the type
256/// of authentication the user needs to complete before the upload might be
257/// continued.
258///
259/// More info can be found in the [spec].
260///
261/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
262#[derive(Debug)]
263pub struct CrossSigningResetHandle {
264    client: Client,
265    upload_request: UploadSigningKeysRequest,
266    signatures_request: UploadSignaturesRequest,
267    auth_type: CrossSigningResetAuthType,
268    is_cancelled: Mutex<bool>,
269}
270
271impl CrossSigningResetHandle {
272    /// Set up a new `CrossSigningResetHandle`.
273    pub fn new(
274        client: Client,
275        upload_request: UploadSigningKeysRequest,
276        signatures_request: UploadSignaturesRequest,
277        auth_type: CrossSigningResetAuthType,
278    ) -> Self {
279        Self {
280            client,
281            upload_request,
282            signatures_request,
283            auth_type,
284            is_cancelled: Mutex::new(false),
285        }
286    }
287
288    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
289    /// is using.
290    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
291        &self.auth_type
292    }
293
294    /// Continue the cross-signing reset by either waiting for the
295    /// authentication to be done on the side of the OAuth 2.0 server or by
296    /// providing additional [`AuthData`] the homeserver requires.
297    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
298        let mut upload_request = self.upload_request.clone();
299        upload_request.auth = auth;
300
301        while let Err(e) = self.client.send(upload_request.clone()).await {
302            if *self.is_cancelled.lock().await {
303                return Ok(());
304            }
305
306            match e.as_uiaa_response() {
307                Some(uiaa_info) => {
308                    if uiaa_info.auth_error.is_some() {
309                        return Err(e.into());
310                    }
311                }
312                None => return Err(e.into()),
313            }
314        }
315
316        self.client.send(self.signatures_request.clone()).await?;
317
318        Ok(())
319    }
320
321    /// Cancel the ongoing identity reset process
322    pub async fn cancel(&self) {
323        *self.is_cancelled.lock().await = true;
324    }
325}
326
327/// information about the additional authentication that is required before the
328/// cross-signing keys can be uploaded.
329#[derive(Debug, Clone)]
330pub enum CrossSigningResetAuthType {
331    /// The homeserver requires user-interactive authentication.
332    Uiaa(UiaaInfo),
333    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
334    /// approve the upload of cross-signing keys.
335    OAuth(OAuthCrossSigningResetInfo),
336}
337
338impl CrossSigningResetAuthType {
339    fn new(error: &HttpError) -> Result<Option<Self>> {
340        if let Some(auth_info) = error.as_uiaa_response() {
341            if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
342                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
343            } else {
344                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
345            }
346        } else {
347            Ok(None)
348        }
349    }
350}
351
352/// OAuth 2.0 specific information about the required authentication for the
353/// upload of cross-signing keys.
354#[derive(Debug, Clone, Deserialize)]
355pub struct OAuthCrossSigningResetInfo {
356    /// The URL where the user can approve the reset of the cross-signing keys.
357    pub approval_url: Url,
358}
359
360impl OAuthCrossSigningResetInfo {
361    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
362        let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
363            return Ok(None);
364        };
365
366        Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
367    }
368}
369
370/// A struct that helps to parse the custom error message Synapse posts if a
371/// duplicate one-time key is uploaded.
372#[derive(Clone, Debug)]
373pub struct DuplicateOneTimeKeyErrorMessage {
374    /// The previously uploaded one-time key.
375    pub old_key: Curve25519PublicKey,
376    /// The one-time key we're attempting to upload right now.
377    pub new_key: Curve25519PublicKey,
378}
379
380impl FromStr for DuplicateOneTimeKeyErrorMessage {
381    type Err = serde_json::Error;
382
383    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
384        // First we split the string into two parts, the part containing the old key and
385        // the part containing the new key. The parts are conveniently separated
386        // by a `;` character.
387        let mut split = s.split_terminator(';');
388
389        let old_key = split
390            .next()
391            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
392        let new_key = split
393            .next()
394            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
395
396        // Now we remove the lengthy prefix from the part containing the old key, we
397        // should be left with just the JSON of the signed key.
398        let old_key_index = old_key
399            .find("Old key:")
400            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
401
402        let old_key = old_key[old_key_index..]
403            .trim()
404            .strip_prefix("Old key:")
405            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
406
407        // The part containing the new key is much simpler, we just remove a static
408        // prefix.
409        let new_key = new_key
410            .trim()
411            .strip_prefix("new key:")
412            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
413
414        // The JSON containing the new key is for some reason quoted using single
415        // quotes, so let's replace them with normal double quotes.
416        let new_key = new_key.replace("'", "\"");
417
418        // Let's deserialize now.
419        let old_key: SignedKey = serde_json::from_str(old_key)?;
420        let new_key: SignedKey = serde_json::from_str(&new_key)?;
421
422        // Pick out the Curve keys, we don't care about the rest that much.
423        let old_key = old_key.key();
424        let new_key = new_key.key();
425
426        Ok(Self { old_key, new_key })
427    }
428}
429
430impl Client {
431    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
432        self.base_client().olm_machine().await
433    }
434
435    pub(crate) async fn mark_request_as_sent(
436        &self,
437        request_id: &TransactionId,
438        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
439    ) -> Result<(), matrix_sdk_base::Error> {
440        Ok(self
441            .olm_machine()
442            .await
443            .as_ref()
444            .expect(
445                "We should have an olm machine once we try to mark E2EE related requests as sent",
446            )
447            .mark_request_as_sent(request_id, response)
448            .await?)
449    }
450
451    /// Query the server for users device keys.
452    ///
453    /// # Panics
454    ///
455    /// Panics if no key query needs to be done.
456    #[instrument(skip(self, device_keys))]
457    pub(crate) async fn keys_query(
458        &self,
459        request_id: &TransactionId,
460        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
461    ) -> Result<get_keys::v3::Response> {
462        let request = assign!(get_keys::v3::Request::new(), { device_keys });
463
464        let response = self.send(request).await?;
465        self.mark_request_as_sent(request_id, &response).await?;
466        self.encryption().update_state_after_keys_query(&response).await;
467
468        Ok(response)
469    }
470
471    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
472    /// encrypting and uploading a provided reader.
473    ///
474    /// # Arguments
475    ///
476    /// * `content_type` - The content type of the file.
477    /// * `reader` - The reader that should be encrypted and uploaded.
478    ///
479    /// # Examples
480    ///
481    /// ```no_run
482    /// # use matrix_sdk::Client;
483    /// # use url::Url;
484    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
485    /// use serde::{Deserialize, Serialize};
486    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
487    ///
488    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
489    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
490    /// struct CustomEventContent {
491    ///     encrypted_file: EncryptedFile,
492    /// }
493    ///
494    /// # async {
495    /// # let homeserver = Url::parse("http://example.com")?;
496    /// # let client = Client::new(homeserver).await?;
497    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
498    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
499    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
500    ///
501    /// room.send(CustomEventContent { encrypted_file }).await?;
502    /// # anyhow::Ok(()) };
503    /// ```
504    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
505        &'a self,
506        reader: &'a mut R,
507    ) -> UploadEncryptedFile<'a, R> {
508        UploadEncryptedFile::new(self, reader)
509    }
510
511    /// Encrypt and upload the file and thumbnails, and return the source
512    /// information.
513    pub(crate) async fn upload_encrypted_media_and_thumbnail(
514        &self,
515        data: &[u8],
516        thumbnail: Option<Thumbnail>,
517        send_progress: SharedObservable<TransmissionProgress>,
518    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
519        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
520
521        let upload_attachment = async {
522            let mut cursor = Cursor::new(data);
523            self.upload_encrypted_file(&mut cursor)
524                .with_send_progress_observable(send_progress)
525                .await
526        };
527
528        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
529
530        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
531    }
532
533    /// Uploads an encrypted thumbnail to the media repository, and returns
534    /// its source and extra information.
535    async fn upload_encrypted_thumbnail(
536        &self,
537        thumbnail: Option<Thumbnail>,
538        send_progress: SharedObservable<TransmissionProgress>,
539    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
540        let Some(thumbnail) = thumbnail else {
541            return Ok(None);
542        };
543
544        let (data, _, thumbnail_info) = thumbnail.into_parts();
545        let mut cursor = Cursor::new(data);
546
547        let file = self
548            .upload_encrypted_file(&mut cursor)
549            .with_send_progress_observable(send_progress)
550            .await?;
551
552        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
553    }
554
555    /// Claim one-time keys creating new Olm sessions.
556    ///
557    /// # Arguments
558    ///
559    /// * `users` - The list of user/device pairs that we should claim keys for.
560    pub(crate) async fn claim_one_time_keys(
561        &self,
562        users: impl Iterator<Item = &UserId>,
563    ) -> Result<()> {
564        let _lock = self.locks().key_claim_lock.lock().await;
565
566        if let Some((request_id, request)) = self
567            .olm_machine()
568            .await
569            .as_ref()
570            .ok_or(Error::NoOlmMachine)?
571            .get_missing_sessions(users)
572            .await?
573        {
574            let response = self.send(request).await?;
575            self.mark_request_as_sent(&request_id, &response).await?;
576        }
577
578        Ok(())
579    }
580
581    /// Upload the E2E encryption keys.
582    ///
583    /// This uploads the long lived device keys as well as the required amount
584    /// of one-time keys.
585    ///
586    /// # Panics
587    ///
588    /// Panics if the client isn't logged in, or if no encryption keys need to
589    /// be uploaded.
590    #[instrument(skip(self, request))]
591    pub(crate) async fn keys_upload(
592        &self,
593        request_id: &TransactionId,
594        request: &upload_keys::v3::Request,
595    ) -> Result<upload_keys::v3::Response> {
596        debug!(
597            device_keys = request.device_keys.is_some(),
598            one_time_key_count = request.one_time_keys.len(),
599            "Uploading public encryption keys",
600        );
601
602        let response = self.send(request.clone()).await?;
603        self.mark_request_as_sent(request_id, &response).await?;
604
605        Ok(response)
606    }
607
608    pub(crate) async fn room_send_helper(
609        &self,
610        request: &RoomMessageRequest,
611    ) -> Result<send_message_event::v3::Response> {
612        let content = request.content.clone();
613        let txn_id = request.txn_id.clone();
614        let room_id = &request.room_id;
615
616        self.get_room(room_id)
617            .expect("Can't send a message to a room that isn't known to the store")
618            .send(*content)
619            .with_transaction_id(txn_id)
620            .await
621            .map(|result| result.response)
622    }
623
624    pub(crate) async fn send_to_device(
625        &self,
626        request: &ToDeviceRequest,
627    ) -> HttpResult<ToDeviceResponse> {
628        let request = RumaToDeviceRequest::new_raw(
629            request.event_type.clone(),
630            request.txn_id.clone(),
631            request.messages.clone(),
632        );
633
634        self.send(request).await
635    }
636
637    pub(crate) async fn send_verification_request(
638        &self,
639        request: OutgoingVerificationRequest,
640    ) -> Result<()> {
641        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
642
643        match request {
644            ToDevice(t) => {
645                self.send_to_device(&t).await?;
646            }
647            InRoom(r) => {
648                self.room_send_helper(&r).await?;
649            }
650        }
651
652        Ok(())
653    }
654
655    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
656        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
657
658        match r.request() {
659            AnyOutgoingRequest::KeysQuery(request) => {
660                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
661            }
662            AnyOutgoingRequest::KeysUpload(request) => {
663                let response = self.keys_upload(r.request_id(), request).await;
664
665                if let Err(e) = &response {
666                    match e.as_ruma_api_error() {
667                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
668                            if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
669                            {
670                                // This is one of the nastiest errors we can have. The server
671                                // telling us that we already have a one-time key uploaded means
672                                // that we forgot about some of our one-time keys. This will lead to
673                                // UTDs.
674                                {
675                                    let already_reported = self
676                                        .state_store()
677                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
678                                        .await?
679                                        .is_some();
680
681                                    if message.starts_with("One time key") && !already_reported {
682                                        let error_message =
683                                            DuplicateOneTimeKeyErrorMessage::from_str(message);
684
685                                        if let Ok(message) = &error_message {
686                                            error!(
687                                                sentry = true,
688                                                old_key = %message.old_key,
689                                                new_key = %message.new_key,
690                                                "Duplicate one-time keys have been uploaded"
691                                            );
692                                        } else {
693                                            error!(
694                                                sentry = true,
695                                                "Duplicate one-time keys have been uploaded"
696                                            );
697                                        }
698
699                                        self.state_store()
700                                            .set_kv_data(
701                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
702                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
703                                            )
704                                            .await?;
705
706                                        if let Err(e) = self
707                                            .inner
708                                            .duplicate_key_upload_error_sender
709                                            .send(error_message.ok())
710                                        {
711                                            error!(
712                                                "Failed to dispatch duplicate key upload error notification: {}",
713                                                e
714                                            );
715                                        }
716                                    }
717                                }
718                            }
719                        }
720                        _ => {}
721                    }
722
723                    response?;
724                }
725            }
726            AnyOutgoingRequest::ToDeviceRequest(request) => {
727                let response = self.send_to_device(request).await?;
728                self.mark_request_as_sent(r.request_id(), &response).await?;
729            }
730            AnyOutgoingRequest::SignatureUpload(request) => {
731                let response = self.send(request.clone()).await?;
732                self.mark_request_as_sent(r.request_id(), &response).await?;
733            }
734            AnyOutgoingRequest::RoomMessage(request) => {
735                let response = self.room_send_helper(request).await?;
736                self.mark_request_as_sent(r.request_id(), &response).await?;
737            }
738            AnyOutgoingRequest::KeysClaim(request) => {
739                let response = self.send(request.clone()).await?;
740                self.mark_request_as_sent(r.request_id(), &response).await?;
741            }
742        }
743
744        Ok(())
745    }
746
747    #[instrument(skip_all)]
748    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
749        const MAX_CONCURRENT_REQUESTS: usize = 20;
750
751        // This is needed because sometimes we need to automatically
752        // claim some one-time keys to unwedge an existing Olm session.
753        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
754            warn!("Error while claiming one-time keys {:?}", e);
755        }
756
757        let outgoing_requests = stream::iter(
758            self.olm_machine()
759                .await
760                .as_ref()
761                .ok_or(Error::NoOlmMachine)?
762                .outgoing_requests()
763                .await?,
764        )
765        .map(|r| self.send_outgoing_request(r));
766
767        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
768
769        requests
770            .for_each(|r| async move {
771                match r {
772                    Ok(_) => (),
773                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
774                }
775            })
776            .await;
777
778        Ok(())
779    }
780}
781
782#[cfg(any(feature = "testing", test))]
783impl Client {
784    /// Get the olm machine, for testing purposes only.
785    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
786        self.olm_machine().await
787    }
788}
789
790/// A high-level API to manage the client's encryption.
791///
792/// To get this, use [`Client::encryption()`].
793#[derive(Debug, Clone)]
794pub struct Encryption {
795    /// The underlying client.
796    client: Client,
797}
798
799impl Encryption {
800    pub(crate) fn new(client: Client) -> Self {
801        Self { client }
802    }
803
804    /// Returns the current encryption settings for this client.
805    pub(crate) fn settings(&self) -> EncryptionSettings {
806        self.client.inner.e2ee.encryption_settings
807    }
808
809    /// Get the public ed25519 key of our own device. This is usually what is
810    /// called the fingerprint of the device.
811    pub async fn ed25519_key(&self) -> Option<String> {
812        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
813    }
814
815    /// Get the public Curve25519 key of our own device.
816    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
817        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
818    }
819
820    /// Get the current device creation timestamp.
821    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
822        match self.get_own_device().await {
823            Ok(Some(device)) => device.first_time_seen_ts(),
824            // Should not happen, there should always be an own device
825            _ => MilliSecondsSinceUnixEpoch::now(),
826        }
827    }
828
829    pub(crate) async fn import_secrets_bundle(
830        &self,
831        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
832    ) -> Result<(), SecretImportError> {
833        let olm_machine = self.client.olm_machine().await;
834        let olm_machine =
835            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
836
837        olm_machine.store().import_secrets_bundle(bundle).await
838    }
839
840    /// Get the status of the private cross signing keys.
841    ///
842    /// This can be used to check which private cross signing keys we have
843    /// stored locally.
844    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
845        let olm = self.client.olm_machine().await;
846        let machine = olm.as_ref()?;
847        Some(machine.cross_signing_status().await)
848    }
849
850    /// Does the user have other devices that the current device can verify
851    /// against?
852    ///
853    /// The device must be signed by the user's cross-signing key, must have an
854    /// identity, and must not be a dehydrated device.
855    pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
856        let olm_machine = self.client.olm_machine().await;
857        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
858        let user_id = olm_machine.user_id();
859
860        self.ensure_initial_key_query().await?;
861
862        let devices = self.get_user_devices(user_id).await?;
863
864        let ret = devices.devices().any(|device| {
865            device.is_cross_signed_by_owner()
866                && device.curve25519_key().is_some()
867                && !device.is_dehydrated()
868        });
869
870        Ok(ret)
871    }
872
873    /// Get all the tracked users we know about
874    ///
875    /// Tracked users are users for which we keep the device list of E2EE
876    /// capable devices up to date.
877    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
878        if let Some(machine) = self.client.olm_machine().await.as_ref() {
879            machine.tracked_users().await
880        } else {
881            Ok(HashSet::new())
882        }
883    }
884
885    /// Get a [`Subscriber`] for the [`VerificationState`].
886    ///
887    /// # Examples
888    ///
889    /// ```no_run
890    /// use matrix_sdk::{Client, encryption};
891    /// use url::Url;
892    ///
893    /// # async {
894    /// let homeserver = Url::parse("http://example.com")?;
895    /// let client = Client::new(homeserver).await?;
896    /// let mut subscriber = client.encryption().verification_state();
897    ///
898    /// let current_value = subscriber.get();
899    ///
900    /// println!("The current verification state is: {current_value:?}");
901    ///
902    /// if let Some(verification_state) = subscriber.next().await {
903    ///     println!("Received verification state update {:?}", verification_state)
904    /// }
905    /// # anyhow::Ok(()) };
906    /// ```
907    pub fn verification_state(&self) -> Subscriber<VerificationState> {
908        self.client.inner.verification_state.subscribe_reset()
909    }
910
911    /// Get a verification object with the given flow id.
912    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
913        let olm = self.client.olm_machine().await;
914        let olm = olm.as_ref()?;
915        #[allow(clippy::bind_instead_of_map)]
916        olm.get_verification(user_id, flow_id).and_then(|v| match v {
917            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
918                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
919            }
920            #[cfg(feature = "qrcode")]
921            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
922                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
923            }
924            _ => None,
925        })
926    }
927
928    /// Get a `VerificationRequest` object for the given user with the given
929    /// flow id.
930    pub async fn get_verification_request(
931        &self,
932        user_id: &UserId,
933        flow_id: impl AsRef<str>,
934    ) -> Option<VerificationRequest> {
935        let olm = self.client.olm_machine().await;
936        let olm = olm.as_ref()?;
937
938        olm.get_verification_request(user_id, flow_id)
939            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
940    }
941
942    /// Get a specific device of a user.
943    ///
944    /// # Arguments
945    ///
946    /// * `user_id` - The unique id of the user that the device belongs to.
947    ///
948    /// * `device_id` - The unique id of the device.
949    ///
950    /// Returns a `Device` if one is found and the crypto store didn't throw an
951    /// error.
952    ///
953    /// This will always return None if the client hasn't been logged in.
954    ///
955    /// # Examples
956    ///
957    /// ```no_run
958    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
959    /// # use url::Url;
960    /// # async {
961    /// # let alice = user_id!("@alice:example.org");
962    /// # let homeserver = Url::parse("http://example.com")?;
963    /// # let client = Client::new(homeserver).await?;
964    /// if let Some(device) =
965    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
966    /// {
967    ///     println!("{:?}", device.is_verified());
968    ///
969    ///     if !device.is_verified() {
970    ///         let verification = device.request_verification().await?;
971    ///     }
972    /// }
973    /// # anyhow::Ok(()) };
974    /// ```
975    pub async fn get_device(
976        &self,
977        user_id: &UserId,
978        device_id: &DeviceId,
979    ) -> Result<Option<Device>, CryptoStoreError> {
980        let olm = self.client.olm_machine().await;
981        let Some(machine) = olm.as_ref() else { return Ok(None) };
982        let device = machine.get_device(user_id, device_id, None).await?;
983        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
984    }
985
986    /// A convenience method to retrieve your own device from the store.
987    ///
988    /// This is the same as calling [`Encryption::get_device()`] with your own
989    /// user and device ID.
990    ///
991    /// This will always return a device, unless you are not logged in.
992    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
993        let olm = self.client.olm_machine().await;
994        let Some(machine) = olm.as_ref() else { return Ok(None) };
995        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
996        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
997    }
998
999    /// Get a map holding all the devices of an user.
1000    ///
1001    /// This will always return an empty map if the client hasn't been logged
1002    /// in.
1003    ///
1004    /// # Arguments
1005    ///
1006    /// * `user_id` - The unique id of the user that the devices belong to.
1007    ///
1008    /// # Examples
1009    ///
1010    /// ```no_run
1011    /// # use matrix_sdk::{Client, ruma::user_id};
1012    /// # use url::Url;
1013    /// # async {
1014    /// # let alice = user_id!("@alice:example.org");
1015    /// # let homeserver = Url::parse("http://example.com")?;
1016    /// # let client = Client::new(homeserver).await?;
1017    /// let devices = client.encryption().get_user_devices(alice).await?;
1018    ///
1019    /// for device in devices.devices() {
1020    ///     println!("{device:?}");
1021    /// }
1022    /// # anyhow::Ok(()) };
1023    /// ```
1024    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1025        let devices = self
1026            .client
1027            .olm_machine()
1028            .await
1029            .as_ref()
1030            .ok_or(Error::NoOlmMachine)?
1031            .get_user_devices(user_id, None)
1032            .await?;
1033
1034        Ok(UserDevices { inner: devices, client: self.client.clone() })
1035    }
1036
1037    /// Get the E2EE identity of a user from the crypto store.
1038    ///
1039    /// Usually, we only have the E2EE identity of a user locally if the user
1040    /// is tracked, meaning that we are both members of the same encrypted room.
1041    ///
1042    /// To get the E2EE identity of a user even if it is not available locally
1043    /// use [`Encryption::request_user_identity()`].
1044    ///
1045    /// # Arguments
1046    ///
1047    /// * `user_id` - The unique id of the user that the identity belongs to.
1048    ///
1049    /// Returns a `UserIdentity` if one is found and the crypto store
1050    /// didn't throw an error.
1051    ///
1052    /// This will always return None if the client hasn't been logged in.
1053    ///
1054    /// # Examples
1055    ///
1056    /// ```no_run
1057    /// # use matrix_sdk::{Client, ruma::user_id};
1058    /// # use url::Url;
1059    /// # async {
1060    /// # let alice = user_id!("@alice:example.org");
1061    /// # let homeserver = Url::parse("http://example.com")?;
1062    /// # let client = Client::new(homeserver).await?;
1063    /// let user = client.encryption().get_user_identity(alice).await?;
1064    ///
1065    /// if let Some(user) = user {
1066    ///     println!("{:?}", user.is_verified());
1067    ///
1068    ///     let verification = user.request_verification().await?;
1069    /// }
1070    /// # anyhow::Ok(()) };
1071    /// ```
1072    pub async fn get_user_identity(
1073        &self,
1074        user_id: &UserId,
1075    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1076        let olm = self.client.olm_machine().await;
1077        let Some(olm) = olm.as_ref() else { return Ok(None) };
1078        let identity = olm.get_identity(user_id, None).await?;
1079
1080        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1081    }
1082
1083    /// Get the E2EE identity of a user from the homeserver.
1084    ///
1085    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1086    /// E2EE identity is not found, it should mean that the user did not set
1087    /// up cross-signing.
1088    ///
1089    /// If you want the E2EE identity of a user without making a request to the
1090    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1091    ///
1092    /// # Arguments
1093    ///
1094    /// * `user_id` - The ID of the user that the identity belongs to.
1095    ///
1096    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1097    /// was an issue with the crypto store or with the request to the
1098    /// homeserver.
1099    ///
1100    /// This will always return `None` if the client hasn't been logged in.
1101    ///
1102    /// # Examples
1103    ///
1104    /// ```no_run
1105    /// # use matrix_sdk::{Client, ruma::user_id};
1106    /// # use url::Url;
1107    /// # async {
1108    /// # let alice = user_id!("@alice:example.org");
1109    /// # let homeserver = Url::parse("http://example.com")?;
1110    /// # let client = Client::new(homeserver).await?;
1111    /// let user = client.encryption().request_user_identity(alice).await?;
1112    ///
1113    /// if let Some(user) = user {
1114    ///     println!("User is verified: {:?}", user.is_verified());
1115    ///
1116    ///     let verification = user.request_verification().await?;
1117    /// }
1118    /// # anyhow::Ok(()) };
1119    /// ```
1120    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1121        let olm = self.client.olm_machine().await;
1122        let Some(olm) = olm.as_ref() else { return Ok(None) };
1123
1124        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1125        self.client.keys_query(&request_id, request.device_keys).await?;
1126
1127        let identity = olm.get_identity(user_id, None).await?;
1128        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1129    }
1130
1131    /// Returns a stream of device updates, allowing users to listen for
1132    /// notifications about new or changed devices.
1133    ///
1134    /// The stream produced by this method emits updates whenever a new device
1135    /// is discovered or when an existing device's information is changed. Users
1136    /// can subscribe to this stream and receive updates in real-time.
1137    ///
1138    /// # Examples
1139    ///
1140    /// ```no_run
1141    /// # use matrix_sdk::Client;
1142    /// # use ruma::{device_id, user_id};
1143    /// # use futures_util::{pin_mut, StreamExt};
1144    /// # let client: Client = unimplemented!();
1145    /// # async {
1146    /// let devices_stream = client.encryption().devices_stream().await?;
1147    /// let user_id = client
1148    ///     .user_id()
1149    ///     .expect("We should know our user id after we have logged in");
1150    /// pin_mut!(devices_stream);
1151    ///
1152    /// for device_updates in devices_stream.next().await {
1153    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1154    ///         for device in user_devices.values() {
1155    ///             println!("A new device has been added {}", device.device_id());
1156    ///         }
1157    ///     }
1158    /// }
1159    /// # anyhow::Ok(()) };
1160    /// ```
1161    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1162        let olm = self.client.olm_machine().await;
1163        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1164        let client = self.client.to_owned();
1165
1166        Ok(olm
1167            .store()
1168            .devices_stream()
1169            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1170    }
1171
1172    /// Returns a stream of user identity updates, allowing users to listen for
1173    /// notifications about new or changed user identities.
1174    ///
1175    /// The stream produced by this method emits updates whenever a new user
1176    /// identity is discovered or when an existing identities information is
1177    /// changed. Users can subscribe to this stream and receive updates in
1178    /// real-time.
1179    ///
1180    /// # Examples
1181    ///
1182    /// ```no_run
1183    /// # use matrix_sdk::Client;
1184    /// # use ruma::{device_id, user_id};
1185    /// # use futures_util::{pin_mut, StreamExt};
1186    /// # let client: Client = unimplemented!();
1187    /// # async {
1188    /// let identities_stream =
1189    ///     client.encryption().user_identities_stream().await?;
1190    /// pin_mut!(identities_stream);
1191    ///
1192    /// for identity_updates in identities_stream.next().await {
1193    ///     for (_, identity) in identity_updates.new {
1194    ///         println!("A new identity has been added {}", identity.user_id());
1195    ///     }
1196    /// }
1197    /// # anyhow::Ok(()) };
1198    /// ```
1199    pub async fn user_identities_stream(
1200        &self,
1201    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1202        let olm = self.client.olm_machine().await;
1203        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1204        let client = self.client.to_owned();
1205
1206        Ok(olm
1207            .store()
1208            .user_identities_stream()
1209            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1210    }
1211
1212    /// Create and upload a new cross signing identity.
1213    ///
1214    /// # Arguments
1215    ///
1216    /// * `auth_data` - This request requires user interactive auth, the first
1217    ///   request needs to set this to `None` and will always fail with an
1218    ///   `UiaaResponse`. The response will contain information for the
1219    ///   interactive auth and the same request needs to be made but this time
1220    ///   with some `auth_data` provided.
1221    ///
1222    /// # Examples
1223    ///
1224    /// ```no_run
1225    /// # use std::collections::BTreeMap;
1226    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1227    /// # use url::Url;
1228    /// # use serde_json::json;
1229    /// # async {
1230    /// # let homeserver = Url::parse("http://example.com")?;
1231    /// # let client = Client::new(homeserver).await?;
1232    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1233    ///     if let Some(response) = e.as_uiaa_response() {
1234    ///         let mut password = uiaa::Password::new(
1235    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1236    ///             "wordpass".to_owned(),
1237    ///         );
1238    ///         password.session = response.session.clone();
1239    ///
1240    ///         client
1241    ///             .encryption()
1242    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1243    ///             .await
1244    ///             .expect("Couldn't bootstrap cross signing")
1245    ///     } else {
1246    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1247    ///     }
1248    /// }
1249    /// # anyhow::Ok(()) };
1250    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1251        let olm = self.client.olm_machine().await;
1252        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1253
1254        let CrossSigningBootstrapRequests {
1255            upload_signing_keys_req,
1256            upload_keys_req,
1257            upload_signatures_req,
1258        } = olm.bootstrap_cross_signing(false).await?;
1259
1260        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1261            auth: auth_data,
1262            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1263            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1264            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1265        });
1266
1267        if let Some(req) = upload_keys_req {
1268            self.client.send_outgoing_request(req).await?;
1269        }
1270        self.client.send(upload_signing_keys_req).await?;
1271        self.client.send(upload_signatures_req).await?;
1272
1273        Ok(())
1274    }
1275
1276    /// Reset the cross-signing keys.
1277    ///
1278    /// # Example
1279    ///
1280    /// ```no_run
1281    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1282    /// # use url::Url;
1283    /// # async {
1284    /// # let homeserver = Url::parse("http://example.com")?;
1285    /// # let client = Client::new(homeserver).await?;
1286    /// # let user_id = unimplemented!();
1287    /// let encryption = client.encryption();
1288    ///
1289    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1290    ///     match handle.auth_type() {
1291    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1292    ///             use matrix_sdk::ruma::api::client::uiaa;
1293    ///
1294    ///             let password = "1234".to_owned();
1295    ///             let mut password = uiaa::Password::new(user_id, password);
1296    ///             password.session = uiaa.session;
1297    ///
1298    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1299    ///         }
1300    ///         CrossSigningResetAuthType::OAuth(o) => {
1301    ///             println!(
1302    ///                 "To reset your end-to-end encryption cross-signing identity, \
1303    ///                 you first need to approve it at {}",
1304    ///                 o.approval_url
1305    ///             );
1306    ///             handle.auth(None).await?;
1307    ///         }
1308    ///     }
1309    /// }
1310    /// # anyhow::Ok(()) };
1311    /// ```
1312    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1313        let olm = self.client.olm_machine().await;
1314        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1315
1316        let CrossSigningBootstrapRequests {
1317            upload_keys_req,
1318            upload_signing_keys_req,
1319            upload_signatures_req,
1320        } = olm.bootstrap_cross_signing(true).await?;
1321
1322        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1323            auth: None,
1324            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1325            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1326            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1327        });
1328
1329        if let Some(req) = upload_keys_req {
1330            self.client.send_outgoing_request(req).await?;
1331        }
1332
1333        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1334            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1335                let client = self.client.clone();
1336
1337                Ok(Some(CrossSigningResetHandle::new(
1338                    client,
1339                    upload_signing_keys_req,
1340                    upload_signatures_req,
1341                    auth_type,
1342                )))
1343            } else {
1344                Err(error.into())
1345            }
1346        } else {
1347            self.client.send(upload_signatures_req).await?;
1348
1349            Ok(None)
1350        }
1351    }
1352
1353    /// Query the user's own device keys, if, and only if, we didn't have their
1354    /// identity in the first place.
1355    async fn ensure_initial_key_query(&self) -> Result<()> {
1356        let olm_machine = self.client.olm_machine().await;
1357        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1358
1359        let user_id = olm_machine.user_id();
1360
1361        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1362            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1363            self.client.keys_query(&request_id, request.device_keys).await?;
1364        }
1365
1366        Ok(())
1367    }
1368
1369    /// Create and upload a new cross signing identity, if that has not been
1370    /// done yet.
1371    ///
1372    /// This will only create a new cross-signing identity if the user had never
1373    /// done it before. If the user did it before, then this is a no-op.
1374    ///
1375    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1376    /// behavior of this function.
1377    ///
1378    /// # Arguments
1379    ///
1380    /// * `auth_data` - This request requires user interactive auth, the first
1381    ///   request needs to set this to `None` and will always fail with an
1382    ///   `UiaaResponse`. The response will contain information for the
1383    ///   interactive auth and the same request needs to be made but this time
1384    ///   with some `auth_data` provided.
1385    ///
1386    /// # Examples
1387    /// ```no_run
1388    /// # use std::collections::BTreeMap;
1389    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1390    /// # use url::Url;
1391    /// # use serde_json::json;
1392    /// # async {
1393    /// # let homeserver = Url::parse("http://example.com")?;
1394    /// # let client = Client::new(homeserver).await?;
1395    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1396    ///     if let Some(response) = e.as_uiaa_response() {
1397    ///         let mut password = uiaa::Password::new(
1398    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1399    ///             "wordpass".to_owned(),
1400    ///         );
1401    ///         password.session = response.session.clone();
1402    ///
1403    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1404    ///         // avoid checks.
1405    ///         client
1406    ///             .encryption()
1407    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1408    ///             .await
1409    ///             .expect("Couldn't bootstrap cross signing")
1410    ///     } else {
1411    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1412    ///     }
1413    /// }
1414    /// # anyhow::Ok(()) };
1415    pub async fn bootstrap_cross_signing_if_needed(
1416        &self,
1417        auth_data: Option<AuthData>,
1418    ) -> Result<()> {
1419        let olm_machine = self.client.olm_machine().await;
1420        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1421        let user_id = olm_machine.user_id();
1422
1423        self.ensure_initial_key_query().await?;
1424
1425        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1426            self.bootstrap_cross_signing(auth_data).await?;
1427        }
1428
1429        Ok(())
1430    }
1431
1432    /// Export E2EE keys that match the given predicate encrypting them with the
1433    /// given passphrase.
1434    ///
1435    /// # Arguments
1436    ///
1437    /// * `path` - The file path where the exported key file will be saved.
1438    ///
1439    /// * `passphrase` - The passphrase that will be used to encrypt the
1440    ///   exported room keys.
1441    ///
1442    /// * `predicate` - A closure that will be called for every known
1443    ///   `InboundGroupSession`, which represents a room key. If the closure
1444    ///   returns `true` the `InboundGroupSessoin` will be included in the
1445    ///   export, if the closure returns `false` it will not be included.
1446    ///
1447    /// # Panics
1448    ///
1449    /// This method will panic if it isn't run on a Tokio runtime.
1450    ///
1451    /// This method will panic if it can't get enough randomness from the OS to
1452    /// encrypt the exported keys securely.
1453    ///
1454    /// # Examples
1455    ///
1456    /// ```no_run
1457    /// # use std::{path::PathBuf, time::Duration};
1458    /// # use matrix_sdk::{
1459    /// #     Client, config::SyncSettings,
1460    /// #     ruma::room_id,
1461    /// # };
1462    /// # use url::Url;
1463    /// # async {
1464    /// # let homeserver = Url::parse("http://localhost:8080")?;
1465    /// # let mut client = Client::new(homeserver).await?;
1466    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1467    /// // Export all room keys.
1468    /// client
1469    ///     .encryption()
1470    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1471    ///     .await?;
1472    ///
1473    /// // Export only the room keys for a certain room.
1474    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1475    /// let room_id = room_id!("!test:localhost");
1476    ///
1477    /// client
1478    ///     .encryption()
1479    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1480    ///     .await?;
1481    /// # anyhow::Ok(()) };
1482    /// ```
1483    #[cfg(not(target_family = "wasm"))]
1484    pub async fn export_room_keys(
1485        &self,
1486        path: PathBuf,
1487        passphrase: &str,
1488        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1489    ) -> Result<()> {
1490        let olm = self.client.olm_machine().await;
1491        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1492
1493        let keys = olm.store().export_room_keys(predicate).await?;
1494        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1495
1496        let encrypt = move || -> Result<()> {
1497            let export: String =
1498                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1499            let mut file = std::fs::File::create(path)?;
1500            file.write_all(&export.into_bytes())?;
1501            Ok(())
1502        };
1503
1504        let task = tokio::task::spawn_blocking(encrypt);
1505        task.await.expect("Task join error")
1506    }
1507
1508    /// Import E2EE keys from the given file path.
1509    ///
1510    /// # Arguments
1511    ///
1512    /// * `path` - The file path where the exported key file will can be found.
1513    ///
1514    /// * `passphrase` - The passphrase that should be used to decrypt the
1515    ///   exported room keys.
1516    ///
1517    /// Returns a tuple of numbers that represent the number of sessions that
1518    /// were imported and the total number of sessions that were found in the
1519    /// key export.
1520    ///
1521    /// # Panics
1522    ///
1523    /// This method will panic if it isn't run on a Tokio runtime.
1524    ///
1525    /// ```no_run
1526    /// # use std::{path::PathBuf, time::Duration};
1527    /// # use matrix_sdk::{
1528    /// #     Client, config::SyncSettings,
1529    /// #     ruma::room_id,
1530    /// # };
1531    /// # use url::Url;
1532    /// # async {
1533    /// # let homeserver = Url::parse("http://localhost:8080")?;
1534    /// # let mut client = Client::new(homeserver).await?;
1535    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1536    /// let result =
1537    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1538    ///
1539    /// println!(
1540    ///     "Imported {} room keys out of {}",
1541    ///     result.imported_count, result.total_count
1542    /// );
1543    /// # anyhow::Ok(()) };
1544    /// ```
1545    #[cfg(not(target_family = "wasm"))]
1546    pub async fn import_room_keys(
1547        &self,
1548        path: PathBuf,
1549        passphrase: &str,
1550    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1551        let olm = self.client.olm_machine().await;
1552        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1553        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1554
1555        let decrypt = move || {
1556            let file = std::fs::File::open(path)?;
1557            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1558        };
1559
1560        let task = tokio::task::spawn_blocking(decrypt);
1561        let import = task.await.expect("Task join error")?;
1562
1563        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1564
1565        self.backups().maybe_trigger_backup();
1566
1567        Ok(ret)
1568    }
1569
1570    /// Receive notifications of room keys being received as a [`Stream`].
1571    ///
1572    /// Each time a room key is updated in any way, an update will be sent to
1573    /// the stream. Updates that happen at the same time are batched into a
1574    /// [`Vec`].
1575    ///
1576    /// If the reader of the stream lags too far behind, an error is broadcast
1577    /// containing the number of skipped items.
1578    ///
1579    /// # Examples
1580    ///
1581    /// ```no_run
1582    /// # use matrix_sdk::Client;
1583    /// # use url::Url;
1584    /// # async {
1585    /// # let homeserver = Url::parse("http://example.com")?;
1586    /// # let client = Client::new(homeserver).await?;
1587    /// use futures_util::StreamExt;
1588    ///
1589    /// let Some(mut room_keys_stream) =
1590    ///     client.encryption().room_keys_received_stream().await
1591    /// else {
1592    ///     return Ok(());
1593    /// };
1594    ///
1595    /// while let Some(update) = room_keys_stream.next().await {
1596    ///     println!("Received room keys {update:?}");
1597    /// }
1598    /// # anyhow::Ok(()) };
1599    /// ```
1600    pub async fn room_keys_received_stream(
1601        &self,
1602    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1603    {
1604        let olm = self.client.olm_machine().await;
1605        let olm = olm.as_ref()?;
1606
1607        Some(olm.store().room_keys_received_stream())
1608    }
1609
1610    /// Receive notifications of historic room key bundles as a [`Stream`].
1611    ///
1612    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1613    ///
1614    /// Each time a historic room key bundle was received, an update will be
1615    /// sent to the stream. This stream is useful for informative purposes
1616    /// exclusively, historic room key bundles are handled by the SDK
1617    /// automatically.
1618    ///
1619    /// # Examples
1620    ///
1621    /// ```no_run
1622    /// # use matrix_sdk::Client;
1623    /// # use url::Url;
1624    /// # async {
1625    /// # let homeserver = Url::parse("http://example.com")?;
1626    /// # let client = Client::new(homeserver).await?;
1627    /// use futures_util::StreamExt;
1628    ///
1629    /// let Some(mut bundle_stream) =
1630    ///     client.encryption().historic_room_key_stream().await
1631    /// else {
1632    ///     return Ok(());
1633    /// };
1634    ///
1635    /// while let Some(bundle_info) = bundle_stream.next().await {
1636    ///     println!("Received a historic room key bundle {bundle_info:?}");
1637    /// }
1638    /// # anyhow::Ok(()) };
1639    /// ```
1640    pub async fn historic_room_key_stream(
1641        &self,
1642    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1643        let olm = self.client.olm_machine().await;
1644        let olm = olm.as_ref()?;
1645
1646        Some(olm.store().historic_room_key_stream())
1647    }
1648
1649    /// Get the secret storage manager of the client.
1650    pub fn secret_storage(&self) -> SecretStorage {
1651        SecretStorage { client: self.client.to_owned() }
1652    }
1653
1654    /// Get the backups manager of the client.
1655    pub fn backups(&self) -> Backups {
1656        Backups { client: self.client.to_owned() }
1657    }
1658
1659    /// Get the recovery manager of the client.
1660    pub fn recovery(&self) -> Recovery {
1661        Recovery { client: self.client.to_owned() }
1662    }
1663
1664    /// Enables the crypto-store cross-process lock.
1665    ///
1666    /// This may be required if there are multiple processes that may do writes
1667    /// to the same crypto store. In that case, it's necessary to create a
1668    /// lock, so that only one process writes to it, otherwise this may
1669    /// cause confusing issues because of stale data contained in in-memory
1670    /// caches.
1671    ///
1672    /// The provided `lock_value` must be a unique identifier for this process.
1673    /// Use [`Client::cross_process_lock_config`] to get the global value, if
1674    /// multi-process is enabled.
1675    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1676        // If the lock has already been created, don't recreate it from scratch.
1677        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1678            let prev_holder = prev_lock.lock_holder();
1679            if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1680                return Ok(());
1681            }
1682            warn!(
1683                "Recreating cross-process store lock with a different holder value: \
1684                 prev was {prev_holder:?}, new is {lock_value}"
1685            );
1686        }
1687
1688        let olm_machine = self.client.base_client().olm_machine().await;
1689        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1690
1691        let lock = olm_machine.store().create_store_lock(
1692            "cross_process_lock".to_owned(),
1693            CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1694        );
1695
1696        // Gently try to initialize the crypto store generation counter.
1697        //
1698        // If we don't get the lock immediately, then it is already acquired by another
1699        // process, and we'll get to reload next time we acquire the lock.
1700        {
1701            let lock_result = lock.try_lock_once().await?;
1702
1703            if lock_result.is_ok() {
1704                olm_machine
1705                    .initialize_crypto_store_generation(
1706                        &self.client.locks().crypto_store_generation,
1707                    )
1708                    .await?;
1709            }
1710        }
1711
1712        self.client
1713            .locks()
1714            .cross_process_crypto_store_lock
1715            .set(lock)
1716            .map_err(|_| Error::BadCryptoStoreState)?;
1717
1718        Ok(())
1719    }
1720
1721    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1722    /// time.
1723    ///
1724    /// Returns the current generation number.
1725    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1726        let olm_machine_guard = self.client.olm_machine().await;
1727        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1728            let (new_gen, generation_number) = olm_machine
1729                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1730                .await?;
1731            // If the crypto store generation has changed,
1732            if new_gen {
1733                // (get rid of the reference to the current crypto store first)
1734                drop(olm_machine_guard);
1735                // Recreate the OlmMachine.
1736                self.client.base_client().regenerate_olm(None).await?;
1737            }
1738            Ok(generation_number)
1739        } else {
1740            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1741            // been initialised by the time we get here. Ideally we'd panic, or return an
1742            // error, but for now I'm just adding some logging to check if it
1743            // happens, and returning the magic number 0.
1744            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1745            Ok(0)
1746        }
1747    }
1748
1749    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1750    /// spin-waits until the lock is available.
1751    ///
1752    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1753    /// first time.
1754    pub async fn spin_lock_store(
1755        &self,
1756        max_backoff: Option<u32>,
1757    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1758        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1759            let guard = lock
1760                .spin_lock(max_backoff)
1761                .await
1762                .map_err(|err| {
1763                    Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(
1764                        Box::new(err),
1765                    )))
1766                })?
1767                .map_err(|err| Error::CrossProcessLockError(Box::new(err.into())))?;
1768
1769            let generation = self.on_lock_newly_acquired().await?;
1770
1771            Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1772                _guard: guard.into_guard(),
1773                generation,
1774            }))
1775        } else {
1776            Ok(None)
1777        }
1778    }
1779
1780    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1781    /// attempts to lock it once.
1782    ///
1783    /// Returns a guard to the lock, if it was obtained.
1784    pub async fn try_lock_store_once(
1785        &self,
1786    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1787        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1788            let lock_result = lock.try_lock_once().await?;
1789
1790            let Some(guard) = lock_result.ok() else {
1791                return Ok(None);
1792            };
1793
1794            let generation = self.on_lock_newly_acquired().await?;
1795
1796            Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1797                _guard: guard.into_guard(),
1798                generation,
1799            }))
1800        } else {
1801            Ok(None)
1802        }
1803    }
1804
1805    /// Testing purposes only.
1806    #[cfg(any(test, feature = "testing"))]
1807    pub async fn uploaded_key_count(&self) -> Result<u64> {
1808        let olm_machine = self.client.olm_machine().await;
1809        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1810        Ok(olm_machine.uploaded_key_count().await?)
1811    }
1812
1813    /// Bootstrap encryption and enables event listeners for the E2EE support.
1814    ///
1815    /// Based on the `EncryptionSettings`, this call might:
1816    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1817    /// - Create a key backup if needed (POST `/room_keys/version`)
1818    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1819    ///
1820    /// As part of this process, and if needed, the current device keys would be
1821    /// uploaded to the server, new account data would be added, and cross
1822    /// signing keys and signatures might be uploaded.
1823    ///
1824    /// Should be called once we
1825    /// created a [`OlmMachine`], i.e. after logging in.
1826    ///
1827    /// # Arguments
1828    ///
1829    /// * `auth_data` - Some requests may require re-authentication. To prevent
1830    ///   the user from having to re-enter their password (or use other
1831    ///   methods), we can provide the authentication data here. This is
1832    ///   necessary for uploading cross-signing keys. However, please note that
1833    ///   there is a proposal (MSC3967) to remove this requirement, which would
1834    ///   allow for the initial upload of cross-signing keys without
1835    ///   authentication, rendering this parameter obsolete.
1836    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1837        // It's fine to be async here as we're only getting the lock protecting the
1838        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1839        // in we won't delay the login or restoration of the Client.
1840        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1841            Some(BundleReceiverTask::new(&self.client).await)
1842        } else {
1843            None
1844        };
1845
1846        let mut tasks = self.client.inner.e2ee.tasks.lock();
1847
1848        let this = self.clone();
1849
1850        tasks.setup_e2ee = Some(spawn(async move {
1851            // Update the current state first, so we don't have to wait for the result of
1852            // network requests
1853            this.update_verification_state().await;
1854
1855            if this.settings().auto_enable_cross_signing
1856                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1857            {
1858                error!("Couldn't bootstrap cross signing {e:?}");
1859            }
1860
1861            if let Err(e) = this.backups().setup_and_resume().await {
1862                error!("Couldn't setup and resume backups {e:?}");
1863            }
1864            if let Err(e) = this.recovery().setup().await {
1865                error!("Couldn't setup and resume recovery {e:?}");
1866            }
1867        }));
1868
1869        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1870    }
1871
1872    /// Waits for end-to-end encryption initialization tasks to finish, if any
1873    /// was running in the background.
1874    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1875        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1876
1877        if let Some(task) = task
1878            && let Err(err) = task.await
1879        {
1880            warn!("Error when initializing backups: {err}");
1881        }
1882    }
1883
1884    /// Upload the device keys and initial set of one-time keys to the server.
1885    ///
1886    /// This should only be called when the user logs in for the first time,
1887    /// the method will ensure that other devices see our own device as an
1888    /// end-to-end encryption enabled one.
1889    ///
1890    /// **Warning**: Do not use this method if we're already calling
1891    /// [`Client::send_outgoing_request()`]. This method is intended for
1892    /// explicitly uploading the device keys before starting a sync.
1893    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1894        let olm = self.client.olm_machine().await;
1895        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1896
1897        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1898            self.client.keys_upload(&request_id, &request).await?;
1899
1900            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1901            self.client.keys_query(&request_id, request.device_keys).await?;
1902        }
1903
1904        Ok(())
1905    }
1906
1907    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1908        self.recovery().update_state_after_keys_query(response).await;
1909
1910        // Only update the verification_state if our own devices changed
1911        if let Some(user_id) = self.client.user_id() {
1912            let contains_own_device = response.device_keys.contains_key(user_id);
1913
1914            if contains_own_device {
1915                self.update_verification_state().await;
1916            }
1917        }
1918    }
1919
1920    async fn update_verification_state(&self) {
1921        match self.get_own_device().await {
1922            Ok(device) => {
1923                if let Some(device) = device {
1924                    let is_verified = device.is_cross_signed_by_owner();
1925
1926                    if is_verified {
1927                        self.client.inner.verification_state.set(VerificationState::Verified);
1928                    } else {
1929                        self.client.inner.verification_state.set(VerificationState::Unverified);
1930                    }
1931                } else {
1932                    warn!("Couldn't find out own device in the store.");
1933                    self.client.inner.verification_state.set(VerificationState::Unknown);
1934                }
1935            }
1936            Err(error) => {
1937                warn!("Failed retrieving own device: {error}");
1938                self.client.inner.verification_state.set(VerificationState::Unknown);
1939            }
1940        }
1941    }
1942
1943    /// Encrypts then send the given content via the `/sendToDevice` end-point
1944    /// using Olm encryption.
1945    ///
1946    /// If there are a lot of recipient devices multiple `/sendToDevice`
1947    /// requests might be sent out.
1948    ///
1949    /// # Returns
1950    /// A list of failures. The list of devices that couldn't get the messages.
1951    #[cfg(feature = "experimental-send-custom-to-device")]
1952    pub async fn encrypt_and_send_raw_to_device(
1953        &self,
1954        recipient_devices: Vec<&Device>,
1955        event_type: &str,
1956        content: Raw<AnyToDeviceEventContent>,
1957        share_strategy: CollectStrategy,
1958    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1959        let users = recipient_devices.iter().map(|device| device.user_id());
1960
1961        // Will claim one-time-key for users that needs it
1962        // TODO: For later optimisation: This will establish missing olm sessions with
1963        // all this users devices, but we just want for some devices.
1964        self.client.claim_one_time_keys(users).await?;
1965
1966        let olm = self.client.olm_machine().await;
1967        let olm = olm.as_ref().expect("Olm machine wasn't started");
1968
1969        let (requests, withhelds) = olm
1970            .encrypt_content_for_devices(
1971                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1972                event_type,
1973                &content
1974                    .deserialize_as::<serde_json::Value>()
1975                    .expect("Deserialize as Value will always work"),
1976                share_strategy,
1977            )
1978            .await?;
1979
1980        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1981
1982        // Push the withhelds in the failures
1983        withhelds.iter().for_each(|(d, _)| {
1984            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1985        });
1986
1987        // TODO: parallelize that? it's already grouping 250 devices per chunk.
1988        for request in requests {
1989            let ruma_request = RumaToDeviceRequest::new_raw(
1990                request.event_type.clone(),
1991                request.txn_id.clone(),
1992                request.messages.clone(),
1993            );
1994
1995            let send_result = self
1996                .client
1997                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1998                .await;
1999
2000            // If the sending failed we need to collect the failures to report them
2001            if send_result.is_err() {
2002                // Mark the sending as failed
2003                for (user_id, device_map) in request.messages {
2004                    for device_id in device_map.keys() {
2005                        match device_id {
2006                            DeviceIdOrAllDevices::DeviceId(device_id) => {
2007                                failures.push((user_id.clone(), device_id.to_owned()));
2008                            }
2009                            DeviceIdOrAllDevices::AllDevices => {
2010                                // Cannot happen in this case
2011                            }
2012                        }
2013                    }
2014                }
2015            }
2016        }
2017
2018        Ok(failures)
2019    }
2020}
2021
2022#[cfg(all(test, not(target_family = "wasm")))]
2023mod tests {
2024    use std::{
2025        ops::Not,
2026        str::FromStr,
2027        sync::{
2028            Arc,
2029            atomic::{AtomicBool, Ordering},
2030        },
2031        time::Duration,
2032    };
2033
2034    use matrix_sdk_test::{
2035        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2036        event_factory::EventFactory,
2037    };
2038    use ruma::{
2039        event_id,
2040        events::{reaction::ReactionEventContent, relation::Annotation},
2041        user_id,
2042    };
2043    use serde_json::json;
2044    use wiremock::{
2045        Mock, MockServer, Request, ResponseTemplate,
2046        matchers::{header, method, path_regex},
2047    };
2048
2049    use crate::{
2050        Client, assert_next_matches_with_timeout,
2051        config::RequestConfig,
2052        encryption::{
2053            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2054        },
2055        test_utils::{
2056            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2057        },
2058    };
2059
2060    #[async_test]
2061    async fn test_reaction_sending() {
2062        let server = MockServer::start().await;
2063        let client = logged_in_client(Some(server.uri())).await;
2064
2065        let event_id = event_id!("$2:example.org");
2066
2067        Mock::given(method("GET"))
2068            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2069            .and(header("authorization", "Bearer 1234"))
2070            .respond_with(
2071                ResponseTemplate::new(200)
2072                    .set_body_json(EventFactory::new().room_encryption().into_content()),
2073            )
2074            .mount(&server)
2075            .await;
2076
2077        Mock::given(method("PUT"))
2078            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2079            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2080                "event_id": event_id,
2081            })))
2082            .mount(&server)
2083            .await;
2084
2085        let f = EventFactory::new().sender(user_id!("@example:localhost"));
2086        let response = SyncResponseBuilder::default()
2087            .add_joined_room(
2088                JoinedRoomBuilder::default()
2089                    .add_state_event(
2090                        f.member(user_id!("@example:localhost")).display_name("example"),
2091                    )
2092                    .add_state_event(f.default_power_levels())
2093                    .add_state_event(f.room_encryption()),
2094            )
2095            .build_sync_response();
2096
2097        client.base_client().receive_sync_response(response).await.unwrap();
2098
2099        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2100        assert!(
2101            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2102        );
2103
2104        let event_id = event_id!("$1:example.org");
2105        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2106        room.send(reaction).await.expect("Sending the reaction should not fail");
2107
2108        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2109    }
2110
2111    #[cfg(feature = "sqlite")]
2112    #[async_test]
2113    async fn test_generation_counter_invalidates_olm_machine() {
2114        // Create two clients using the same sqlite database.
2115
2116        use matrix_sdk_base::store::RoomLoadSettings;
2117        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2118        let session = mock_matrix_session();
2119
2120        let client1 = Client::builder()
2121            .homeserver_url("http://localhost:1234")
2122            .request_config(RequestConfig::new().disable_retry())
2123            .sqlite_store(&sqlite_path, None)
2124            .build()
2125            .await
2126            .unwrap();
2127        client1
2128            .matrix_auth()
2129            .restore_session(session.clone(), RoomLoadSettings::default())
2130            .await
2131            .unwrap();
2132
2133        let client2 = Client::builder()
2134            .homeserver_url("http://localhost:1234")
2135            .request_config(RequestConfig::new().disable_retry())
2136            .sqlite_store(sqlite_path, None)
2137            .build()
2138            .await
2139            .unwrap();
2140        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2141
2142        // When the lock isn't enabled, any attempt at locking won't return a guard.
2143        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2144        assert!(guard.is_none());
2145
2146        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2147        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2148
2149        // One client can take the lock.
2150        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2151        assert!(acquired1.is_some());
2152
2153        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2154        let initial_olm_machine =
2155            client1.olm_machine().await.clone().expect("must have an olm machine");
2156
2157        // Also enable backup to check that new machine has the same backup keys.
2158        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2159            .expect("Can't create new recovery key");
2160        let backup_key = decryption_key.megolm_v1_public_key();
2161        backup_key.set_version("1".to_owned());
2162        initial_olm_machine
2163            .backup_machine()
2164            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2165            .await
2166            .expect("Should save");
2167
2168        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2169
2170        assert!(client1.encryption().backups().are_enabled().await);
2171
2172        // The other client can't take the lock too.
2173        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2174        assert!(acquired2.is_none());
2175
2176        // Now have the first client release the lock,
2177        drop(acquired1);
2178        tokio::time::sleep(Duration::from_millis(100)).await;
2179
2180        // And re-take it.
2181        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2182        assert!(acquired1.is_some());
2183
2184        // In that case, the Olm Machine shouldn't change.
2185        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2186        assert!(initial_olm_machine.same_as(&olm_machine));
2187
2188        // Ok, release again.
2189        drop(acquired1);
2190        tokio::time::sleep(Duration::from_millis(100)).await;
2191
2192        // Client2 can acquire the lock.
2193        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2194        assert!(acquired2.is_some());
2195
2196        // And then release it.
2197        drop(acquired2);
2198        tokio::time::sleep(Duration::from_millis(100)).await;
2199
2200        // Client1 can acquire it again,
2201        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2202        assert!(acquired1.is_some());
2203
2204        // But now its olm machine has been invalidated and thus regenerated!
2205        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2206
2207        assert!(!initial_olm_machine.same_as(&olm_machine));
2208
2209        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2210        assert!(backup_key_new.decryption_key.is_some());
2211        assert_eq!(
2212            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2213            backup_key.to_base64()
2214        );
2215        assert!(client1.encryption().backups().are_enabled().await);
2216    }
2217
2218    #[cfg(feature = "sqlite")]
2219    #[async_test]
2220    async fn test_generation_counter_no_spurious_invalidation() {
2221        // Create two clients using the same sqlite database.
2222
2223        use matrix_sdk_base::store::RoomLoadSettings;
2224        let sqlite_path =
2225            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2226        let session = mock_matrix_session();
2227
2228        let client = Client::builder()
2229            .homeserver_url("http://localhost:1234")
2230            .request_config(RequestConfig::new().disable_retry())
2231            .sqlite_store(&sqlite_path, None)
2232            .build()
2233            .await
2234            .unwrap();
2235        client
2236            .matrix_auth()
2237            .restore_session(session.clone(), RoomLoadSettings::default())
2238            .await
2239            .unwrap();
2240
2241        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2242
2243        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2244
2245        // Enabling the lock doesn't update the olm machine.
2246        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2247        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2248
2249        {
2250            // Simulate that another client hold the lock before.
2251            let client2 = Client::builder()
2252                .homeserver_url("http://localhost:1234")
2253                .request_config(RequestConfig::new().disable_retry())
2254                .sqlite_store(sqlite_path, None)
2255                .build()
2256                .await
2257                .unwrap();
2258            client2
2259                .matrix_auth()
2260                .restore_session(session, RoomLoadSettings::default())
2261                .await
2262                .unwrap();
2263
2264            client2
2265                .encryption()
2266                .enable_cross_process_store_lock("client2".to_owned())
2267                .await
2268                .unwrap();
2269
2270            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2271            assert!(guard.is_some());
2272
2273            drop(guard);
2274            tokio::time::sleep(Duration::from_millis(100)).await;
2275        }
2276
2277        {
2278            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2279            assert!(acquired.is_some());
2280        }
2281
2282        // Taking the lock the first time will update the olm machine.
2283        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2284        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2285
2286        {
2287            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2288            assert!(acquired.is_some());
2289        }
2290
2291        // Re-taking the lock doesn't update the olm machine.
2292        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2293        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2294    }
2295
2296    #[async_test]
2297    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2298        // Given a client and a server
2299        let client = no_retry_test_client(None).await;
2300        let server = MockServer::start().await;
2301
2302        // When we subscribe to its verification state
2303        let mut verification_state = client.encryption().verification_state();
2304
2305        // We can get its initial value, and it's Unknown
2306        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2307
2308        // We set up a mocked request to check this endpoint is not called before
2309        // reading the new state
2310        let keys_requested = Arc::new(AtomicBool::new(false));
2311        let inner_bool = keys_requested.clone();
2312
2313        Mock::given(method("GET"))
2314            .and(path_regex(
2315                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2316            ))
2317            .respond_with(move |_req: &Request| {
2318                inner_bool.fetch_or(true, Ordering::SeqCst);
2319                ResponseTemplate::new(200).set_body_json(json!({}))
2320            })
2321            .mount(&server)
2322            .await;
2323
2324        // When the session is initialised and the encryption tasks spawn
2325        set_client_session(&client).await;
2326
2327        // Then we can get an updated value without waiting for any network requests
2328        assert!(keys_requested.load(Ordering::SeqCst).not());
2329        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2330    }
2331
2332    #[test]
2333    fn test_oauth_reset_info_from_uiaa_info() {
2334        let auth_info = json!({
2335            "session": "dummy",
2336            "flows": [
2337                {
2338                    "stages": [
2339                        "org.matrix.cross_signing_reset"
2340                    ]
2341                }
2342            ],
2343            "params": {
2344                "org.matrix.cross_signing_reset": {
2345                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2346                }
2347            },
2348            "msg": "To reset..."
2349        });
2350
2351        let auth_info = serde_json::from_value(auth_info)
2352            .expect("We should be able to deserialize the UiaaInfo");
2353        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2354            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2355    }
2356
2357    #[test]
2358    fn test_duplicate_one_time_key_error_parsing() {
2359        let message = concat!(
2360            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2361            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2362            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2363            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2364            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2365            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2366            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2367        );
2368        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2369            .expect("We should be able to parse the error message");
2370
2371        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2372        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2373
2374        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2375            .expect_err("We shouldn't be able to parse an incomplete error message");
2376    }
2377
2378    // Helper function for the test_devices_to_verify_against_* tests.  Make a
2379    // response to a /keys/query request using the given device keys and a
2380    // pre-defined set of cross-signing keys.
2381    fn devices_to_verify_against_keys_query_response(
2382        devices: Vec<serde_json::Value>,
2383    ) -> serde_json::Value {
2384        let device_keys: serde_json::Map<String, serde_json::Value> = devices
2385            .into_iter()
2386            .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2387            .collect();
2388        json!({
2389            "device_keys": {
2390                "@example:localhost": device_keys,
2391            },
2392            "master_keys": {
2393                "@example:localhost": {
2394                    "keys": {
2395                        "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2396                    },
2397                    "usage": ["master"],
2398                    "user_id": "@example:localhost",
2399                },
2400            },
2401            "self_signing_keys": {
2402                "@example:localhost": {
2403                    "keys": {
2404                        "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2405                    },
2406                    "usage": ["self_signing"],
2407                    "user_id": "@example:localhost",
2408                    "signatures": {
2409                        "@example:localhost": {
2410                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2411                        },
2412                    },
2413                },
2414            },
2415            "user_signing_keys": {
2416                "@example:localhost": {
2417                    "keys": {
2418                        "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2419                    },
2420                    "usage": ["user_signing"],
2421                    "user_id": "@example:localhost",
2422                    "signatures": {
2423                        "@example:localhost": {
2424                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2425                        },
2426                    },
2427                },
2428            }
2429        })
2430    }
2431
2432    // The following three tests test that we can detect whether the user has
2433    // other devices that they can verify against under different conditions.
2434    #[async_test]
2435    /// Test that we detect that can't verify against another device if we have
2436    /// no devices.
2437    async fn test_devices_to_verify_against_no_devices() {
2438        let server = MockServer::start().await;
2439        let client = logged_in_client(Some(server.uri())).await;
2440
2441        Mock::given(method("POST"))
2442            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2443            .respond_with(
2444                ResponseTemplate::new(200)
2445                    .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2446            )
2447            .mount(&server)
2448            .await;
2449
2450        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2451    }
2452
2453    #[async_test]
2454    /// Test that we detect that we can verify against another cross-signed
2455    /// regular device.
2456    async fn test_devices_to_verify_against_cross_signed() {
2457        let server = MockServer::start().await;
2458        let client = logged_in_client(Some(server.uri())).await;
2459
2460        Mock::given(method("POST"))
2461            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2462            .respond_with(ResponseTemplate::new(200).set_body_json(
2463                devices_to_verify_against_keys_query_response(vec![
2464                    json!({
2465                        "algorithms": [
2466                            "m.olm.v1.curve25519-aes-sha2",
2467                            "m.megolm.v1.aes-sha2",
2468                        ],
2469                        "user_id": "@example:localhost",
2470                        "device_id": "SIGNEDDEVICE",
2471                        "keys": {
2472                            "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2473                            "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2474                        },
2475                        "signatures": {
2476                            "@example:localhost": {
2477                                "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2478                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2479                            },
2480                        },
2481                    })
2482                ])
2483            ))
2484            .mount(&server)
2485            .await;
2486
2487        assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2488    }
2489
2490    #[async_test]
2491    /// Test that we detect that we can't verify against a dehydrated or
2492    /// unsigned device.
2493    async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2494        let server = MockServer::start().await;
2495        let client = logged_in_client(Some(server.uri())).await;
2496        let user_id = client.user_id().unwrap();
2497        let olm_machine = client.olm_machine().await;
2498        let olm_machine = olm_machine.as_ref().unwrap();
2499
2500        Mock::given(method("POST"))
2501            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2502            .respond_with(ResponseTemplate::new(200).set_body_json(
2503                devices_to_verify_against_keys_query_response(vec![
2504                    json!({
2505                        "algorithms": [
2506                            "m.olm.v1.curve25519-aes-sha2",
2507                            "m.megolm.v1.aes-sha2",
2508                        ],
2509                        "user_id": "@example:localhost",
2510                        "device_id": "DEHYDRATEDDEVICE",
2511                        "keys": {
2512                            "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2513                            "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2514                        },
2515                        "dehydrated": true,
2516                        "signatures": {
2517                            "@example:localhost": {
2518                                "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2519                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2520                            },
2521                        },
2522                    }),
2523                    json!({
2524                        "algorithms": [
2525                            "m.olm.v1.curve25519-aes-sha2",
2526                            "m.megolm.v1.aes-sha2",
2527                        ],
2528                        "user_id": "@example:localhost",
2529                        "device_id": "UNSIGNEDDEVICE",
2530                        "keys": {
2531                            "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2532                            "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2533                        },
2534                        "signatures": {
2535                            "@example:localhost": {
2536                                "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2537                            },
2538                        },
2539                    }),
2540                ])
2541            ))
2542            .mount(&server)
2543            .await;
2544
2545        let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2546        client.keys_query(&request_id, request.device_keys).await.unwrap();
2547
2548        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2549    }
2550}